summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-16 11:55:02 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-16 11:55:02 +0100
commit67b986d695826fe3d906f95e0b886a26790c75c2 (patch)
tree1cad6fb9a2d56215a6427090312052011c32ab02 /src
parenta38eb2d9ffb1740bde3f79c38505f19db4646cee (diff)
downloadrabbitmq-server-git-67b986d695826fe3d906f95e0b886a26790c75c2.tar.gz
Cope with queue death even though there are in-flight commits going on
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_variable_queue.erl57
3 files changed, 43 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index cc6f08b7d4..3ca39e108f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -346,7 +346,8 @@ unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}).
+ gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun},
+ infinity).
flush_all(QPids, ChPid) ->
safe_pmap_ok(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index efbc276692..5fda693524 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -136,8 +136,6 @@ terminate({shutdown, _}, State) ->
terminate(_Reason, State) ->
ok = rabbit_memory_monitor:deregister(self()),
%% FIXME: How do we cancel active subscriptions?
- %% Ensure that any persisted tx messages are removed.
- %% TODO: wait for all in flight tx_commits to complete
State1 = terminate_shutdown(delete_and_terminate, State),
ok = rabbit_amqqueue:internal_delete(qname(State1)).
@@ -733,7 +731,10 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(ok, State);
_ ->
reply(locked, State)
- end.
+ end;
+
+handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
handle_cast(init_backing_queue, State = #q{backing_queue_state = undefined,
@@ -793,9 +794,6 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, State));
-
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
possibly_unblock(
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 90e1eb6c2a..aa1589a6d0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -319,15 +319,17 @@ init(QueueName, IsDurable) ->
},
maybe_deltas_to_betas(State).
-terminate(State = #vqstate {
- persistent_count = PCount,
- index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} }) ->
+terminate(State) ->
+ State1 = #vqstate {
+ persistent_count = PCount, index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} } =
+ tx_commit_index(State),
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
Terms = [{persistent_ref, PRef}, {transient_ref, TRef},
{persistent_count, PCount}],
- State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }.
+ State1 #vqstate { index_state =
+ rabbit_queue_index:terminate(Terms, IndexState) }.
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
@@ -559,19 +561,10 @@ tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) ->
tx_commit_post_msg_store(
IsTransientPubs, PubsOrdered, AckTags1, From, State);
false ->
- Self = self(),
- ok =
- rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE, PersistentGuids,
- fun () ->
- ok =
- rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self,
- fun (StateN) -> tx_commit_post_msg_store(
- IsTransientPubs, PubsOrdered,
- AckTags1, From, StateN)
- end)
- end),
+ ok = rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE, PersistentGuids,
+ msg_store_callback(PersistentGuids, IsTransientPubs,
+ PubsOrdered, AckTags1, From)),
State
end}.
@@ -832,13 +825,37 @@ should_force_index_to_disk(State =
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
+msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, From) ->
+ Self = self(),
+ fun() ->
+ spawn(
+ fun() ->
+ ok = rabbit_misc:with_exit_handler(
+ fun() -> rabbit_msg_store:remove(
+ ?PERSISTENT_MSG_STORE,
+ PersistentGuids)
+ end,
+ fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ Self, fun (StateN) ->
+ tx_commit_post_msg_store(
+ IsTransientPubs, Pubs,
+ AckTags, From, StateN)
+ end)
+ end)
+ end)
+ end.
+
tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
#vqstate { on_sync = OnSync = {SAcks, SPubs, SFroms},
- persistent_store = PersistentStore }) ->
+ persistent_store = PersistentStore,
+ pending_ack = PA }) ->
%% If we are a non-durable queue, or (no persisent pubs, and no
%% persistent acks) then we can skip the queue_index loop.
case PersistentStore == ?TRANSIENT_MSG_STORE orelse
- (IsTransientPubs andalso [] == AckTags) of %%% AGH FIX ME
+ (IsTransientPubs andalso
+ lists:foldl(fun (AckTag, true ) -> dict:is_key(AckTag, PA);
+ (_AckTag, false) -> false
+ end, true, AckTags)) of
true -> State1 = tx_commit_index(State #vqstate {
on_sync = {[], [Pubs], [From]} }),
State1 #vqstate { on_sync = OnSync };