diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-16 14:35:15 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-16 14:35:15 +0100 |
| commit | 4dfaaebf75782966558879647dc84a1787850db9 (patch) | |
| tree | 27098c874104fdb75d1813b14c1dfeccc1838eaa | |
| parent | fd898a5062c75057f1d7979ada125a0f5ac1ae0b (diff) | |
| download | rabbitmq-server-git-4dfaaebf75782966558879647dc84a1787850db9.tar.gz | |
Bug fix.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 14 |
3 files changed, 19 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3367c75407..561e9e6954 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, tx_commit_callback/3]). +-export([notify_sent/2, unblock/2, tx_commit_callback/4]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -107,7 +107,8 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(tx_commit_callback/3 :: (pid(), [message()], [acktag()]) -> 'ok'). +-spec(tx_commit_callback/4 :: (pid(), [message()], [acktag()], {pid(), any()}) + -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -321,8 +322,8 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {unblock, ChPid}). -tx_commit_callback(QPid, Pubs, AckTags) -> - gen_server2:pcast(QPid, 8, {tx_commit_callback, Pubs, AckTags}). +tx_commit_callback(QPid, Pubs, AckTags, From) -> + gen_server2:pcast(QPid, 8, {tx_commit_callback, Pubs, AckTags, From}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9711b54b6..66fc45ea26 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -470,7 +470,7 @@ record_pending_acks(Txn, ChPid, MsgIds) -> store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). -commit_transaction(Txn, State) -> +commit_transaction(Txn, From, State) -> #tx { ch_pid = ChPid, pending_messages = PendingMessages, pending_acks = PendingAcks @@ -487,7 +487,7 @@ commit_transaction(Txn, State) -> [AckTag || {_Msg, AckTag} <- MsgsWithAcks] end, VQS = rabbit_variable_queue:tx_commit( - PendingMessagesOrdered, Acks, State #q.variable_queue_state), + PendingMessagesOrdered, Acks, From, State #q.variable_queue_state), State #q { variable_queue_state = VQS }. rollback_transaction(Txn, State) -> @@ -573,9 +573,7 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> - NewState = commit_transaction(Txn, State), - %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), + NewState = commit_transaction(Txn, From, State), erase_tx(Txn), noreply(run_message_queue(NewState)); @@ -783,10 +781,11 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({tx_commit_callback, Pubs, AckTags}, +handle_cast({tx_commit_callback, Pubs, AckTags, From}, State = #q{variable_queue_state = VQS}) -> noreply(State#q{variable_queue_state = - rabbit_variable_queue:do_tx_commit(Pubs, AckTags, VQS)}); + rabbit_variable_queue:do_tx_commit( + Pubs, AckTags, From, VQS)}); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 745d59eadd..dddfb4a85c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, - tx_publish/2, tx_rollback/2, tx_commit/3, do_tx_commit/3]). + tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]). %%---------------------------------------------------------------------------- @@ -349,21 +349,21 @@ tx_rollback(Pubs, State) -> end, State. -tx_commit(Pubs, AckTags, State) -> +tx_commit(Pubs, AckTags, From, State) -> case persistent_msg_ids(Pubs) of [] -> - do_tx_commit(Pubs, AckTags, State); + do_tx_commit(Pubs, AckTags, From, State); PersistentMsgIds -> Self = self(), ok = rabbit_msg_store:sync( PersistentMsgIds, fun () -> ok = rabbit_amqqueue:tx_commit_callback( - Self, Pubs, AckTags) + Self, Pubs, AckTags, From) end), State end. -do_tx_commit(Pubs, AckTags, State) -> +do_tx_commit(Pubs, AckTags, From, State) -> {_PubSeqIds, State1} = lists:foldl( fun (Msg, {SeqIdsAcc, StateN}) -> @@ -371,7 +371,9 @@ do_tx_commit(Pubs, AckTags, State) -> {[SeqId | SeqIdsAcc], StateN1} end, {[], State}, Pubs), %% TODO need to do something here about syncing the queue index, PubSeqIds - ack(AckTags, State1). + State2 = ack(AckTags, State1), + gen_server2:reply(From, ok), + State2. %%---------------------------------------------------------------------------- |
