summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-16 17:34:34 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-16 17:34:34 +0000
commit06f5ec199e9314aa1de26c4d673e53699c4a3549 (patch)
treea0702ca00df51ea6b30bf72b094a1b1569cdaff0
parentc7f164e1725dfc0a1f122283ed4d0eb5183c1dd8 (diff)
parente2bec715642c14dcf92324849eb76739e1093af3 (diff)
downloadrabbitmq-server-git-06f5ec199e9314aa1de26c4d673e53699c4a3549.tar.gz
Merge default
-rw-r--r--src/file_handle_cache.erl13
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_queue_index.erl13
-rw-r--r--src/rabbit_variable_queue.erl36
4 files changed, 40 insertions, 29 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 59a0ab1cc0..f3b4dbafa2 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -143,9 +143,9 @@
-behaviour(gen_server2).
-export([register_callback/3]).
--export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
- current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3,
- set_maximum_since_use/1, delete/1, clear/1]).
+-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
+ truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
+ copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
@@ -373,6 +373,13 @@ sync(Ref) ->
end
end).
+needs_sync(Ref) ->
+ with_handles(
+ [Ref],
+ fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false;
+ ([_Handle]) -> true
+ end).
+
position(Ref, NewOffset) ->
with_flushed_handles(
[Ref],
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 12cd0c93ff..34efbde84d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -23,6 +23,7 @@
-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(IDLE_TIMEOUT, 10).
-define(BASE_MESSAGE_PROPERTIES,
#message_properties{expiry = undefined, needs_confirming = false}).
@@ -250,9 +251,9 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State#q{
backing_queue_state = BQS1}))),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?IDLE_TIMEOUT};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_module(#amqqueue{arguments = Args}) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4c8793f1ad..366e1b1be1 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -18,8 +18,8 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
- next_segment_boundary/1, bounds/1, recover/1]).
+ publish/5, deliver/2, ack/2, sync/1, sync/2, needs_sync/1, flush/1,
+ read/3, next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -285,8 +285,8 @@ ack(SeqIds, State) ->
%% This is only called when there are outstanding confirms and the
%% queue is idle.
-sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
- sync_if(not gb_sets:is_empty(MsgIds), State).
+sync(State) ->
+ sync_if(true, State).
sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack to
@@ -298,6 +298,11 @@ sync(SeqIds, State) ->
%% seqids not being in the journal.
sync_if([] =/= SeqIds, State).
+needs_sync(#qistate { journal_handle = undefined }) ->
+ false;
+needs_sync(#qistate { journal_handle = JournalHdl }) ->
+ file_handle_cache:needs_sync(JournalHdl).
+
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 52eb168a42..ea18ef2a2b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -731,21 +731,26 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State) ->
+needs_timeout(State = #vqstate { index_state = IndexState }) ->
case needs_index_sync(State) of
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end;
- true -> timed
+ true -> timed;
+ false -> case rabbit_queue_index:needs_sync(IndexState) of
+ true -> idle;
+ false -> false
+ end
end.
-timeout(State) ->
- a(reduce_memory_use(confirm_commit_index(State))).
+timeout(State = #vqstate { index_state = IndexState }) ->
+ IndexState1 = rabbit_queue_index:sync(IndexState),
+ State1 = State #vqstate { index_state = IndexState1 },
+ a(case reduce_memory_use(
+ fun (_Quota, State2) -> {0, State2} end,
+ fun (_Quota, State2) -> State2 end,
+ fun (_Quota, State2) -> {0, State2} end,
+ State) of
+ {true, _State} -> reduce_memory_use(State1);
+ {false, _State} -> State1
+ end).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1262,13 +1267,6 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
-confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
- case needs_index_sync(State) of
- true -> State #vqstate {
- index_state = rabbit_queue_index:sync(IndexState) };
- false -> State
- end.
-
record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC,