summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_variable_queue.erl79
4 files changed, 69 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d23cbd1923..f0d12ae5de 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,7 +41,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4,
+-export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/5,
tx_commit_vq_callback/1, flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -111,8 +111,8 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(tx_commit_msg_store_callback/4 ::
- (pid(), [message()], [acktag()], {pid(), any()}) -> 'ok').
+-spec(tx_commit_msg_store_callback/5 ::
+ (pid(), boolean(), [message()], [acktag()], {pid(), any()}) -> 'ok').
-spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
@@ -362,9 +362,9 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
-tx_commit_msg_store_callback(QPid, Pubs, AckTags, From) ->
- gen_server2:pcast(QPid, 7,
- {tx_commit_msg_store_callback, Pubs, AckTags, From}).
+tx_commit_msg_store_callback(QPid, IsTransientPubs, Pubs, AckTags, From) ->
+ gen_server2:pcast(QPid, 7, {tx_commit_msg_store_callback,
+ IsTransientPubs, Pubs, AckTags, From}).
tx_commit_vq_callback(QPid) ->
gen_server2:pcast(QPid, 7, tx_commit_vq_callback).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c9add5b2d5..fa445c3ae5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -894,12 +894,15 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({tx_commit_msg_store_callback, Pubs, AckTags, From},
+handle_cast({tx_commit_msg_store_callback, IsTransientPubs, Pubs, AckTags, From},
State = #q{variable_queue_state = VQS}) ->
- noreply(
- State#q{variable_queue_state =
- rabbit_variable_queue:tx_commit_from_msg_store(
- Pubs, AckTags, From, VQS)});
+ {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_msg_store(
+ IsTransientPubs, Pubs, AckTags, From, VQS),
+ State1 = State#q{variable_queue_state = VQS1},
+ noreply(case RunQueue of
+ true -> run_message_queue(State1);
+ false -> State1
+ end);
handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) ->
noreply(
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 5257f20112..b37845d47d 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -352,6 +352,8 @@ write_acks(SeqIds, State) ->
add_to_journal(SeqId, ack, StateN)
end, State1, SeqIds)).
+sync_seq_ids([], State) ->
+ State;
sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) ->
State;
sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4a4ba999ff..8f813fb49c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -35,7 +35,7 @@
set_queue_ram_duration_target/2, remeasure_rates/1,
ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1,
delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2,
- tx_commit/4, tx_commit_from_msg_store/4, tx_commit_from_vq/1,
+ tx_commit/4, tx_commit_from_msg_store/5, tx_commit_from_vq/1,
needs_sync/1, flush_journal/1, status/1]).
%%----------------------------------------------------------------------------
@@ -241,9 +241,10 @@
-spec(tx_publish/2 :: (basic_message(), vqstate()) -> vqstate()).
-spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()).
-spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, vqstate()) ->
- {boolean(), vqstate()}).
--spec(tx_commit_from_msg_store/4 ::
- ([msg_id()], [ack()], {pid(), any()}, vqstate()) -> vqstate()).
+ {boolean(), vqstate()}).
+-spec(tx_commit_from_msg_store/5 ::
+ (boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) ->
+ {boolean(), vqstate()}).
-spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()).
-spec(needs_sync/1 :: (vqstate()) -> boolean()).
-spec(flush_journal/1 :: (vqstate()) -> vqstate()).
@@ -454,6 +455,8 @@ fetch(State =
index_state = IndexState1, len = Len1 }}
end.
+ack([], State) ->
+ State;
ack(AckTags, State = #vqstate { index_state = IndexState,
persistent_count = PCount,
persistent_store = PersistentStore }) ->
@@ -583,45 +586,69 @@ tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) ->
end,
State.
-tx_commit(Pubs, AckTags, From, State = #vqstate { persistent_store = PersistentStore }) ->
- case persistent_msg_ids(Pubs) of
- [] ->
- {true, tx_commit_from_msg_store(Pubs, AckTags, From, State)};
- PersistentMsgIds ->
+tx_commit(Pubs, AckTags, From, State =
+ #vqstate { persistent_store = PersistentStore }) ->
+ %% If we are a non-durable queue, or we have no persistent pubs,
+ %% we can skip the msg_store loop.
+ PersistentMsgIds = persistent_msg_ids(Pubs),
+ IsTransientPubs = [] == PersistentMsgIds,
+ case IsTransientPubs orelse
+ ?TRANSIENT_MSG_STORE == PersistentStore of
+ true ->
+ tx_commit_from_msg_store(
+ IsTransientPubs, Pubs, AckTags, From, State);
+ false ->
Self = self(),
ok = rabbit_msg_store:sync(
- PersistentStore, PersistentMsgIds,
+ ?PERSISTENT_MSG_STORE, PersistentMsgIds,
fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback(
- Self, Pubs, AckTags, From)
+ Self, IsTransientPubs, Pubs, AckTags, From)
end),
{false, State}
end.
-tx_commit_from_msg_store(Pubs, AckTags, From,
- State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) ->
+tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
+ #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms},
+ persistent_store = PersistentStore }) ->
+ %% If we are a non-durable queue, or (no persisent pubs, and no
+ %% persistent acks) then we can skip the queue_index loop.
DiskAcks =
lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags),
- State #vqstate { on_sync = { [DiskAcks | SAcks],
- [Pubs | SPubs],
- [From | SFroms] }}.
+ case PersistentStore == ?TRANSIENT_MSG_STORE orelse
+ (IsTransientPubs andalso [] == DiskAcks) of
+ true -> State1 = tx_commit_from_vq(State #vqstate {
+ on_sync = {[], [Pubs], [From]} }),
+ {true, State1 #vqstate { on_sync = OnSync }};
+ false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks],
+ [Pubs | SPubs],
+ [From | SFroms] }}}
+ end.
tx_commit_from_vq(State = #vqstate { on_sync = {_, _, []} }) ->
State;
-tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) ->
- State1 = ack(lists:flatten(SAcks), State),
- {PubSeqIds, State2 = #vqstate { index_state = IndexState }} =
+tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
+ persistent_store = PersistentStore }) ->
+ Acks = lists:flatten(SAcks),
+ State1 = ack(Acks, State),
+ AckSeqIds = lists:foldl(fun ({ack_index_and_store, _MsgId,
+ SeqId, ?PERSISTENT_MSG_STORE}, SeqIdsAcc) ->
+ [SeqId | SeqIdsAcc];
+ (_, SeqIdsAcc) ->
+ SeqIdsAcc
+ end, [], Acks),
+ IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore,
+ {SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, StateN}) ->
{SeqId, StateN1} = publish(Msg, false, IsPersistent, StateN),
- SeqIdsAcc1 = case IsPersistent of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end,
- {SeqIdsAcc1, StateN1}
- end, {[], State1}, lists:flatten(lists:reverse(SPubs))),
+ {case IsPersistentStore andalso IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end, StateN1}
+ end, {AckSeqIds, State1}, lists:flatten(lists:reverse(SPubs))),
IndexState1 =
- rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState),
+ rabbit_queue_index:sync_seq_ids(SeqIds, IndexState),
[ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.