summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-16 14:35:15 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-16 14:35:15 +0100
commit4dfaaebf75782966558879647dc84a1787850db9 (patch)
tree27098c874104fdb75d1813b14c1dfeccc1838eaa
parentfd898a5062c75057f1d7979ada125a0f5ac1ae0b (diff)
downloadrabbitmq-server-git-4dfaaebf75782966558879647dc84a1787850db9.tar.gz
Bug fix.
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_variable_queue.erl14
3 files changed, 19 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3367c75407..561e9e6954 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,7 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, tx_commit_callback/3]).
+-export([notify_sent/2, unblock/2, tx_commit_callback/4]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -107,7 +107,8 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(tx_commit_callback/3 :: (pid(), [message()], [acktag()]) -> 'ok').
+-spec(tx_commit_callback/4 :: (pid(), [message()], [acktag()], {pid(), any()})
+ -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -321,8 +322,8 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {unblock, ChPid}).
-tx_commit_callback(QPid, Pubs, AckTags) ->
- gen_server2:pcast(QPid, 8, {tx_commit_callback, Pubs, AckTags}).
+tx_commit_callback(QPid, Pubs, AckTags, From) ->
+ gen_server2:pcast(QPid, 8, {tx_commit_callback, Pubs, AckTags, From}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9711b54b6..66fc45ea26 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -470,7 +470,7 @@ record_pending_acks(Txn, ChPid, MsgIds) ->
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
ch_pid = ChPid}).
-commit_transaction(Txn, State) ->
+commit_transaction(Txn, From, State) ->
#tx { ch_pid = ChPid,
pending_messages = PendingMessages,
pending_acks = PendingAcks
@@ -487,7 +487,7 @@ commit_transaction(Txn, State) ->
[AckTag || {_Msg, AckTag} <- MsgsWithAcks]
end,
VQS = rabbit_variable_queue:tx_commit(
- PendingMessagesOrdered, Acks, State #q.variable_queue_state),
+ PendingMessagesOrdered, Acks, From, State #q.variable_queue_state),
State #q { variable_queue_state = VQS }.
rollback_transaction(Txn, State) ->
@@ -573,9 +573,7 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
- NewState = commit_transaction(Txn, State),
- %% optimisation: we reply straight away so the sender can continue
- gen_server2:reply(From, ok),
+ NewState = commit_transaction(Txn, From, State),
erase_tx(Txn),
noreply(run_message_queue(NewState));
@@ -783,10 +781,11 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({tx_commit_callback, Pubs, AckTags},
+handle_cast({tx_commit_callback, Pubs, AckTags, From},
State = #q{variable_queue_state = VQS}) ->
noreply(State#q{variable_queue_state =
- rabbit_variable_queue:do_tx_commit(Pubs, AckTags, VQS)});
+ rabbit_variable_queue:do_tx_commit(
+ Pubs, AckTags, From, VQS)});
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 745d59eadd..dddfb4a85c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,7 @@
-export([init/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2,
remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1,
maybe_start_prefetcher/1, purge/1, delete/1, requeue/2,
- tx_publish/2, tx_rollback/2, tx_commit/3, do_tx_commit/3]).
+ tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]).
%%----------------------------------------------------------------------------
@@ -349,21 +349,21 @@ tx_rollback(Pubs, State) ->
end,
State.
-tx_commit(Pubs, AckTags, State) ->
+tx_commit(Pubs, AckTags, From, State) ->
case persistent_msg_ids(Pubs) of
[] ->
- do_tx_commit(Pubs, AckTags, State);
+ do_tx_commit(Pubs, AckTags, From, State);
PersistentMsgIds ->
Self = self(),
ok = rabbit_msg_store:sync(
PersistentMsgIds,
fun () -> ok = rabbit_amqqueue:tx_commit_callback(
- Self, Pubs, AckTags)
+ Self, Pubs, AckTags, From)
end),
State
end.
-do_tx_commit(Pubs, AckTags, State) ->
+do_tx_commit(Pubs, AckTags, From, State) ->
{_PubSeqIds, State1} =
lists:foldl(
fun (Msg, {SeqIdsAcc, StateN}) ->
@@ -371,7 +371,9 @@ do_tx_commit(Pubs, AckTags, State) ->
{[SeqId | SeqIdsAcc], StateN1}
end, {[], State}, Pubs),
%% TODO need to do something here about syncing the queue index, PubSeqIds
- ack(AckTags, State1).
+ State2 = ack(AckTags, State1),
+ gen_server2:reply(From, ok),
+ State2.
%%----------------------------------------------------------------------------