diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-16 11:55:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-16 11:55:02 +0100 |
| commit | 67b986d695826fe3d906f95e0b886a26790c75c2 (patch) | |
| tree | 1cad6fb9a2d56215a6427090312052011c32ab02 /src | |
| parent | a38eb2d9ffb1740bde3f79c38505f19db4646cee (diff) | |
| download | rabbitmq-server-git-67b986d695826fe3d906f95e0b886a26790c75c2.tar.gz | |
Cope with queue death even though there are in-flight commits going on
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 57 |
3 files changed, 43 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cc6f08b7d4..3ca39e108f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -346,7 +346,8 @@ unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 7, {unblock, ChPid}). maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}). + gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}, + infinity). flush_all(QPids, ChPid) -> safe_pmap_ok( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index efbc276692..5fda693524 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -136,8 +136,6 @@ terminate({shutdown, _}, State) -> terminate(_Reason, State) -> ok = rabbit_memory_monitor:deregister(self()), %% FIXME: How do we cancel active subscriptions? - %% Ensure that any persisted tx messages are removed. - %% TODO: wait for all in flight tx_commits to complete State1 = terminate_shutdown(delete_and_terminate, State), ok = rabbit_amqqueue:internal_delete(qname(State1)). @@ -733,7 +731,10 @@ handle_call({claim_queue, ReaderPid}, _From, reply(ok, State); _ -> reply(locked, State) - end. + end; + +handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). handle_cast(init_backing_queue, State = #q{backing_queue_state = undefined, @@ -793,9 +794,6 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> - noreply(maybe_run_queue_via_backing_queue(Fun, State)); - handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( possibly_unblock( diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 90e1eb6c2a..aa1589a6d0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -319,15 +319,17 @@ init(QueueName, IsDurable) -> }, maybe_deltas_to_betas(State). -terminate(State = #vqstate { - persistent_count = PCount, - index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} }) -> +terminate(State) -> + State1 = #vqstate { + persistent_count = PCount, index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} } = + tx_commit_index(State), rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], - State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }. + State1 #vqstate { index_state = + rabbit_queue_index:terminate(Terms, IndexState) }. %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. @@ -559,19 +561,10 @@ tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) -> tx_commit_post_msg_store( IsTransientPubs, PubsOrdered, AckTags1, From, State); false -> - Self = self(), - ok = - rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, - fun () -> - ok = - rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, - fun (StateN) -> tx_commit_post_msg_store( - IsTransientPubs, PubsOrdered, - AckTags1, From, StateN) - end) - end), + ok = rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, PersistentGuids, + msg_store_callback(PersistentGuids, IsTransientPubs, + PubsOrdered, AckTags1, From)), State end}. @@ -832,13 +825,37 @@ should_force_index_to_disk(State = %% Internal major helpers for Public API %%---------------------------------------------------------------------------- +msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, From) -> + Self = self(), + fun() -> + spawn( + fun() -> + ok = rabbit_misc:with_exit_handler( + fun() -> rabbit_msg_store:remove( + ?PERSISTENT_MSG_STORE, + PersistentGuids) + end, + fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, fun (StateN) -> + tx_commit_post_msg_store( + IsTransientPubs, Pubs, + AckTags, From, StateN) + end) + end) + end) + end. + tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State = #vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms}, - persistent_store = PersistentStore }) -> + persistent_store = PersistentStore, + pending_ack = PA }) -> %% If we are a non-durable queue, or (no persisent pubs, and no %% persistent acks) then we can skip the queue_index loop. case PersistentStore == ?TRANSIENT_MSG_STORE orelse - (IsTransientPubs andalso [] == AckTags) of %%% AGH FIX ME + (IsTransientPubs andalso + lists:foldl(fun (AckTag, true ) -> dict:is_key(AckTag, PA); + (_AckTag, false) -> false + end, true, AckTags)) of true -> State1 = tx_commit_index(State #vqstate { on_sync = {[], [Pubs], [From]} }), State1 #vqstate { on_sync = OnSync }; |
