summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-27 11:55:09 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-27 11:55:09 +0100
commit5fc13e26ff8a1a0b5dbacec2aa19326fbbd8cd1f (patch)
treecf18d9ee6ec9b7308c285854cb71db25665a45c4
parent9577456310849f81d03ac7be111879bb40f8373a (diff)
downloadrabbitmq-server-git-5fc13e26ff8a1a0b5dbacec2aa19326fbbd8cd1f.tar.gz
Reworked handle_ch_down so that we detect early whether or not we should auto_delete the queue and only do rollback and requeue if we know that the queue isn't going to be deleted. ALSO minor refactoring in MQ with magic_marker
-rw-r--r--src/rabbit_amqqueue_process.erl42
-rw-r--r--src/rabbit_mixed_queue.erl5
2 files changed, 23 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 406429ef8f..adf84c0e4c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -399,27 +399,27 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
- State1 =
- case Txn of
- none -> State;
- _ -> rollback_transaction(Txn, State)
- end,
- State2 =
- deliver_or_requeue_n(
- [MsgWithAck ||
- {_MsgId, MsgWithAck} <- dict:to_list(UAM)],
- State1 #q {
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- active_consumers = remove_consumers(
- ChPid, State1#q.active_consumers),
- blocked_consumers = remove_consumers(
- ChPid, State1#q.blocked_consumers)}),
- case should_auto_delete(State2) of
- false -> noreply(State2);
- true -> {stop, normal, State2}
+ State1 = State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, _} -> none;
+ Other -> Other
+ end,
+ active_consumers = remove_consumers(
+ ChPid, State#q.active_consumers),
+ blocked_consumers = remove_consumers(
+ ChPid, State#q.blocked_consumers)},
+ case should_auto_delete(State1) of
+ true ->
+ {stop, normal, State1};
+ false ->
+ State2 = case Txn of
+ none -> State1;
+ _ -> rollback_transaction(Txn, State1)
+ end,
+ noreply(
+ deliver_or_requeue_n(
+ [MsgWithAck ||
+ {_MsgId, MsgWithAck} <- dict:to_list(UAM)], State2))
end
end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 0aa1b54263..c6f71fa638 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -572,9 +572,8 @@ publish_magic_marker_message(Q) ->
ok = rabbit_disk_queue:publish(Q, ensure_binary_properties(Msg), false).
fetch_ack_magic_marker_message(Q) ->
- {#basic_message { exchange_name = none, routing_key = internal,
- is_persistent = true },
- false, AckTag, Length} = rabbit_disk_queue:fetch(Q),
+ {Msg, false, AckTag, Length} = rabbit_disk_queue:fetch(Q),
+ true = is_magic_marker_message(Msg),
ok = rabbit_disk_queue:ack(Q, [AckTag]),
{ok, Length}.