diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-23 14:28:24 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-23 14:28:24 +0100 |
| commit | 9ab1135f64baa7f59fbc3cb2e6b43f2a1614c748 (patch) | |
| tree | c6a52d758cf8334aed12c004358d65091f2e1acc /src | |
| parent | 21cca2413fcf784f44058c207b59bbc92e519856 (diff) | |
| download | rabbitmq-server-git-9ab1135f64baa7f59fbc3cb2e6b43f2a1614c748.tar.gz | |
mainly cosmetic renamings. Also added support to queue_index:sync so that you can indicate whether or not the ack journal should be sync'd. Use of strace shows that in the test in bug 20470 #c6, we're doing 2 fsyncs per txn, which makes sense - one for the qi, and one for the msg_store. However, only getting about 180 txns/sec, as opposed to 350 as reported in that bug.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
4 files changed, 26 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 561e9e6954..d0a5f205a8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, tx_commit_callback/4]). +-export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -107,8 +107,8 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(tx_commit_callback/4 :: (pid(), [message()], [acktag()], {pid(), any()}) - -> 'ok'). +-spec(tx_commit_msg_store_callback/4 :: (pid(), [message()], [acktag()], + {pid(), any()}) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -322,8 +322,9 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {unblock, ChPid}). -tx_commit_callback(QPid, Pubs, AckTags, From) -> - gen_server2:pcast(QPid, 8, {tx_commit_callback, Pubs, AckTags, From}). +tx_commit_msg_store_callback(QPid, Pubs, AckTags, From) -> + gen_server2:pcast(QPid, 8, + {tx_commit_msg_store_callback, Pubs, AckTags, From}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e2477e988b..434652a58b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -788,12 +788,13 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({tx_commit_callback, Pubs, AckTags, From}, +handle_cast({tx_commit_msg_store_callback, Pubs, AckTags, From}, State = #q{variable_queue_state = VQS}) -> noreply( run_message_queue( State#q{variable_queue_state = - rabbit_variable_queue:do_tx_commit(Pubs, AckTags, From, VQS)})); + rabbit_variable_queue:tx_commit_from_msg_store( + Pubs, AckTags, From, VQS)})); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 7259eaa23f..67637ed2d7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,7 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/2, + write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/3, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). @@ -217,9 +217,13 @@ full_flush_journal(State) -> {false, State1} -> State1 end. -sync_seq_ids(SeqIds, State) -> - {Hdl, State1} = get_journal_handle(State), - ok = file_handle_cache:sync(Hdl), +sync_seq_ids(SeqIds, SyncAckJournal, State) -> + State1 = case SyncAckJournal of + true -> {Hdl, State2} = get_journal_handle(State), + ok = file_handle_cache:sync(Hdl), + State2; + false -> State + end, SegNumsSet = lists:foldl( fun (SeqId, Set) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4dce1f6a64..cb2bdca718 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/1, terminate/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, - requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]). + requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4]). %%---------------------------------------------------------------------------- @@ -367,26 +367,29 @@ tx_rollback(Pubs, State) -> tx_commit(Pubs, AckTags, From, State) -> case persistent_msg_ids(Pubs) of [] -> - {true, do_tx_commit(Pubs, AckTags, From, State)}; + {true, tx_commit_from_msg_store(Pubs, AckTags, From, State)}; PersistentMsgIds -> Self = self(), ok = rabbit_msg_store:sync( PersistentMsgIds, - fun () -> ok = rabbit_amqqueue:tx_commit_callback( + fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback( Self, Pubs, AckTags, From) end), {false, State} end. -do_tx_commit(Pubs, AckTags, From, State) -> - State1 = ack(AckTags, State), +tx_commit_from_msg_store(Pubs, AckTags, From, State) -> + DiskAcks = + lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags), + State1 = ack(DiskAcks, State), {PubSeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg, {SeqIdsAcc, StateN}) -> {SeqId, StateN1} = publish(Msg, false, true, StateN), {[SeqId | SeqIdsAcc], StateN1} end, {[], State1}, Pubs), - IndexState1 = rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState), + IndexState1 = + rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= DiskAcks, IndexState), gen_server2:reply(From, ok), State2 #vqstate { index_state = IndexState1 }. |
