summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-10 17:42:22 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-10 17:42:22 +0000
commit0ee8b2f49415b04440410312ffe4ca0071c5a7b4 (patch)
treec02f29120e38eccf0cafddad64947c50d8a689aa /src
parent67c99b6f1b7e8baf1acce3e0db9f6769e67e28a7 (diff)
parentfdc3bb7b6720182ad80410d9473ee8cb29f0235c (diff)
downloadrabbitmq-server-git-0ee8b2f49415b04440410312ffe4ca0071c5a7b4.tar.gz
Merging bug23665 into default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_queue_index.erl26
-rw-r--r--src/rabbit_variable_queue.erl42
3 files changed, 47 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f4ea6ba802..80dc651a11 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -445,7 +445,8 @@ record_confirm_message(#delivery{sender = ChPid,
State =
#q{guid_to_channel = GTC,
q = #amqqueue{durable = true}}) ->
- {confirm, State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
+ {confirm,
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
record_confirm_message(_Delivery, State) ->
{no_confirm, State}.
@@ -463,8 +464,9 @@ attempt_delivery(#delivery{txn = none,
message = Message,
msg_seq_no = MsgSeqNo},
{NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
case {NeedsConfirming, MsgSeqNo} of
- {_, undefined} -> ok;
+ {_, undefined} -> ok;
{no_confirm, _} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
{confirm, _} -> ok
end,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef86..2162104fb0 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -297,11 +297,12 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
-sync([], State) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+%% This is only called when there are outstanding confirms and the
+%% queue is idle.
+sync(State = #qistate { unsynced_guids = Guids }) ->
+ sync_if([] =/= Guids, State).
+
+sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
%% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled above anyway).
- ok = file_handle_cache:sync(JournalHdl),
- notify_sync(State).
+ %% emptied (handled by sync_if anyway).
+ sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+sync_if(false, State) ->
+ State;
+sync_if(_Bool, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
State #qistate { unsynced_guids = [] }.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 19d7c5761a..18423dd7b0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -535,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- unconfirmed = Unconfirmed1 }))}.
+ unconfirmed = UC1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -809,17 +809,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
-needs_idle_timeout(_State) ->
- true.
+needs_idle_timeout(State = #vqstate { on_sync = OnSync, unconfirmed = UC }) ->
+ case {OnSync, gb_sets:is_empty(UC)} of
+ {?BLANK_SYNC, true} ->
+ {Res, _State} = reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State),
+ Res;
+ _ ->
+ true
+ end.
-idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+idle_timeout(State) ->
+ a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1232,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
@@ -1242,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
ram_msg_count = RamMsgCount + 1,
- unconfirmed = Unconfirmed1 }}.
+ unconfirmed = UC1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
@@ -1386,6 +1391,11 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
+confirm_commit_index(State = #vqstate { unconfirmed = [] }) ->
+ State;
+confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
+
remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->