diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 79 |
4 files changed, 69 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d23cbd1923..f0d12ae5de 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,7 +41,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4, +-export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/5, tx_commit_vq_callback/1, flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -111,8 +111,8 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(tx_commit_msg_store_callback/4 :: - (pid(), [message()], [acktag()], {pid(), any()}) -> 'ok'). +-spec(tx_commit_msg_store_callback/5 :: + (pid(), boolean(), [message()], [acktag()], {pid(), any()}) -> 'ok'). -spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). @@ -362,9 +362,9 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 7, {unblock, ChPid}). -tx_commit_msg_store_callback(QPid, Pubs, AckTags, From) -> - gen_server2:pcast(QPid, 7, - {tx_commit_msg_store_callback, Pubs, AckTags, From}). +tx_commit_msg_store_callback(QPid, IsTransientPubs, Pubs, AckTags, From) -> + gen_server2:pcast(QPid, 7, {tx_commit_msg_store_callback, + IsTransientPubs, Pubs, AckTags, From}). tx_commit_vq_callback(QPid) -> gen_server2:pcast(QPid, 7, tx_commit_vq_callback). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c9add5b2d5..fa445c3ae5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -894,12 +894,15 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({tx_commit_msg_store_callback, Pubs, AckTags, From}, +handle_cast({tx_commit_msg_store_callback, IsTransientPubs, Pubs, AckTags, From}, State = #q{variable_queue_state = VQS}) -> - noreply( - State#q{variable_queue_state = - rabbit_variable_queue:tx_commit_from_msg_store( - Pubs, AckTags, From, VQS)}); + {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_msg_store( + IsTransientPubs, Pubs, AckTags, From, VQS), + State1 = State#q{variable_queue_state = VQS1}, + noreply(case RunQueue of + true -> run_message_queue(State1); + false -> State1 + end); handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) -> noreply( diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 5257f20112..b37845d47d 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -352,6 +352,8 @@ write_acks(SeqIds, State) -> add_to_journal(SeqId, ack, StateN) end, State1, SeqIds)). +sync_seq_ids([], State) -> + State; sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4a4ba999ff..8f813fb49c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -35,7 +35,7 @@ set_queue_ram_duration_target/2, remeasure_rates/1, ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, - tx_commit/4, tx_commit_from_msg_store/4, tx_commit_from_vq/1, + tx_commit/4, tx_commit_from_msg_store/5, tx_commit_from_vq/1, needs_sync/1, flush_journal/1, status/1]). %%---------------------------------------------------------------------------- @@ -241,9 +241,10 @@ -spec(tx_publish/2 :: (basic_message(), vqstate()) -> vqstate()). -spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()). -spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, vqstate()) -> - {boolean(), vqstate()}). --spec(tx_commit_from_msg_store/4 :: - ([msg_id()], [ack()], {pid(), any()}, vqstate()) -> vqstate()). + {boolean(), vqstate()}). +-spec(tx_commit_from_msg_store/5 :: + (boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) -> + {boolean(), vqstate()}). -spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()). -spec(needs_sync/1 :: (vqstate()) -> boolean()). -spec(flush_journal/1 :: (vqstate()) -> vqstate()). @@ -454,6 +455,8 @@ fetch(State = index_state = IndexState1, len = Len1 }} end. +ack([], State) -> + State; ack(AckTags, State = #vqstate { index_state = IndexState, persistent_count = PCount, persistent_store = PersistentStore }) -> @@ -583,45 +586,69 @@ tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) -> end, State. -tx_commit(Pubs, AckTags, From, State = #vqstate { persistent_store = PersistentStore }) -> - case persistent_msg_ids(Pubs) of - [] -> - {true, tx_commit_from_msg_store(Pubs, AckTags, From, State)}; - PersistentMsgIds -> +tx_commit(Pubs, AckTags, From, State = + #vqstate { persistent_store = PersistentStore }) -> + %% If we are a non-durable queue, or we have no persistent pubs, + %% we can skip the msg_store loop. + PersistentMsgIds = persistent_msg_ids(Pubs), + IsTransientPubs = [] == PersistentMsgIds, + case IsTransientPubs orelse + ?TRANSIENT_MSG_STORE == PersistentStore of + true -> + tx_commit_from_msg_store( + IsTransientPubs, Pubs, AckTags, From, State); + false -> Self = self(), ok = rabbit_msg_store:sync( - PersistentStore, PersistentMsgIds, + ?PERSISTENT_MSG_STORE, PersistentMsgIds, fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback( - Self, Pubs, AckTags, From) + Self, IsTransientPubs, Pubs, AckTags, From) end), {false, State} end. -tx_commit_from_msg_store(Pubs, AckTags, From, - State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) -> +tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State = + #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms}, + persistent_store = PersistentStore }) -> + %% If we are a non-durable queue, or (no persisent pubs, and no + %% persistent acks) then we can skip the queue_index loop. DiskAcks = lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags), - State #vqstate { on_sync = { [DiskAcks | SAcks], - [Pubs | SPubs], - [From | SFroms] }}. + case PersistentStore == ?TRANSIENT_MSG_STORE orelse + (IsTransientPubs andalso [] == DiskAcks) of + true -> State1 = tx_commit_from_vq(State #vqstate { + on_sync = {[], [Pubs], [From]} }), + {true, State1 #vqstate { on_sync = OnSync }}; + false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks], + [Pubs | SPubs], + [From | SFroms] }}} + end. tx_commit_from_vq(State = #vqstate { on_sync = {_, _, []} }) -> State; -tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) -> - State1 = ack(lists:flatten(SAcks), State), - {PubSeqIds, State2 = #vqstate { index_state = IndexState }} = +tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, + persistent_store = PersistentStore }) -> + Acks = lists:flatten(SAcks), + State1 = ack(Acks, State), + AckSeqIds = lists:foldl(fun ({ack_index_and_store, _MsgId, + SeqId, ?PERSISTENT_MSG_STORE}, SeqIdsAcc) -> + [SeqId | SeqIdsAcc]; + (_, SeqIdsAcc) -> + SeqIdsAcc + end, [], Acks), + IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, + {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, StateN}) -> {SeqId, StateN1} = publish(Msg, false, IsPersistent, StateN), - SeqIdsAcc1 = case IsPersistent of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, - {SeqIdsAcc1, StateN1} - end, {[], State1}, lists:flatten(lists:reverse(SPubs))), + {case IsPersistentStore andalso IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, StateN1} + end, {AckSeqIds, State1}, lists:flatten(lists:reverse(SPubs))), IndexState1 = - rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState), + rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. |
