summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-23 14:28:24 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-23 14:28:24 +0100
commit9ab1135f64baa7f59fbc3cb2e6b43f2a1614c748 (patch)
treec6a52d758cf8334aed12c004358d65091f2e1acc /src
parent21cca2413fcf784f44058c207b59bbc92e519856 (diff)
downloadrabbitmq-server-git-9ab1135f64baa7f59fbc3cb2e6b43f2a1614c748.tar.gz
mainly cosmetic renamings. Also added support to queue_index:sync so that you can indicate whether or not the ack journal should be sync'd. Use of strace shows that in the test in bug 20470 #c6, we're doing 2 fsyncs per txn, which makes sense - one for the qi, and one for the msg_store. However, only getting about 180 txns/sec, as opposed to 350 as reported in that bug.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_variable_queue.erl15
4 files changed, 26 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 561e9e6954..d0a5f205a8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,7 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, tx_commit_callback/4]).
+-export([notify_sent/2, unblock/2, tx_commit_msg_store_callback/4]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -107,8 +107,8 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(tx_commit_callback/4 :: (pid(), [message()], [acktag()], {pid(), any()})
- -> 'ok').
+-spec(tx_commit_msg_store_callback/4 :: (pid(), [message()], [acktag()],
+ {pid(), any()}) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -322,8 +322,9 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {unblock, ChPid}).
-tx_commit_callback(QPid, Pubs, AckTags, From) ->
- gen_server2:pcast(QPid, 8, {tx_commit_callback, Pubs, AckTags, From}).
+tx_commit_msg_store_callback(QPid, Pubs, AckTags, From) ->
+ gen_server2:pcast(QPid, 8,
+ {tx_commit_msg_store_callback, Pubs, AckTags, From}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e2477e988b..434652a58b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -788,12 +788,13 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({tx_commit_callback, Pubs, AckTags, From},
+handle_cast({tx_commit_msg_store_callback, Pubs, AckTags, From},
State = #q{variable_queue_state = VQS}) ->
noreply(
run_message_queue(
State#q{variable_queue_state =
- rabbit_variable_queue:do_tx_commit(Pubs, AckTags, From, VQS)}));
+ rabbit_variable_queue:tx_commit_from_msg_store(
+ Pubs, AckTags, From, VQS)}));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 7259eaa23f..67637ed2d7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/2,
+ write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/3,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
@@ -217,9 +217,13 @@ full_flush_journal(State) ->
{false, State1} -> State1
end.
-sync_seq_ids(SeqIds, State) ->
- {Hdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:sync(Hdl),
+sync_seq_ids(SeqIds, SyncAckJournal, State) ->
+ State1 = case SyncAckJournal of
+ true -> {Hdl, State2} = get_journal_handle(State),
+ ok = file_handle_cache:sync(Hdl),
+ State2;
+ false -> State
+ end,
SegNumsSet =
lists:foldl(
fun (SeqId, Set) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4dce1f6a64..cb2bdca718 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,7 @@
-export([init/1, terminate/1, publish/2, publish_delivered/2,
set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1,
ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
- requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]).
+ requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4]).
%%----------------------------------------------------------------------------
@@ -367,26 +367,29 @@ tx_rollback(Pubs, State) ->
tx_commit(Pubs, AckTags, From, State) ->
case persistent_msg_ids(Pubs) of
[] ->
- {true, do_tx_commit(Pubs, AckTags, From, State)};
+ {true, tx_commit_from_msg_store(Pubs, AckTags, From, State)};
PersistentMsgIds ->
Self = self(),
ok = rabbit_msg_store:sync(
PersistentMsgIds,
- fun () -> ok = rabbit_amqqueue:tx_commit_callback(
+ fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback(
Self, Pubs, AckTags, From)
end),
{false, State}
end.
-do_tx_commit(Pubs, AckTags, From, State) ->
- State1 = ack(AckTags, State),
+tx_commit_from_msg_store(Pubs, AckTags, From, State) ->
+ DiskAcks =
+ lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags),
+ State1 = ack(DiskAcks, State),
{PubSeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg, {SeqIdsAcc, StateN}) ->
{SeqId, StateN1} = publish(Msg, false, true, StateN),
{[SeqId | SeqIdsAcc], StateN1}
end, {[], State1}, Pubs),
- IndexState1 = rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState),
+ IndexState1 =
+ rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= DiskAcks, IndexState),
gen_server2:reply(From, ok),
State2 #vqstate { index_state = IndexState1 }.