diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-27 11:55:09 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-27 11:55:09 +0100 |
| commit | 5fc13e26ff8a1a0b5dbacec2aa19326fbbd8cd1f (patch) | |
| tree | cf18d9ee6ec9b7308c285854cb71db25665a45c4 | |
| parent | 9577456310849f81d03ac7be111879bb40f8373a (diff) | |
| download | rabbitmq-server-git-5fc13e26ff8a1a0b5dbacec2aa19326fbbd8cd1f.tar.gz | |
Reworked handle_ch_down so that we detect early whether or not we should auto_delete the queue and only do rollback and requeue if we know that the queue isn't going to be deleted. ALSO minor refactoring in MQ with magic_marker
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 5 |
2 files changed, 23 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 406429ef8f..adf84c0e4c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -399,27 +399,27 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> unacked_messages = UAM} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), - State1 = - case Txn of - none -> State; - _ -> rollback_transaction(Txn, State) - end, - State2 = - deliver_or_requeue_n( - [MsgWithAck || - {_MsgId, MsgWithAck} <- dict:to_list(UAM)], - State1 #q { - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - active_consumers = remove_consumers( - ChPid, State1#q.active_consumers), - blocked_consumers = remove_consumers( - ChPid, State1#q.blocked_consumers)}), - case should_auto_delete(State2) of - false -> noreply(State2); - true -> {stop, normal, State2} + State1 = State#q{ + exclusive_consumer = case Holder of + {ChPid, _} -> none; + Other -> Other + end, + active_consumers = remove_consumers( + ChPid, State#q.active_consumers), + blocked_consumers = remove_consumers( + ChPid, State#q.blocked_consumers)}, + case should_auto_delete(State1) of + true -> + {stop, normal, State1}; + false -> + State2 = case Txn of + none -> State1; + _ -> rollback_transaction(Txn, State1) + end, + noreply( + deliver_or_requeue_n( + [MsgWithAck || + {_MsgId, MsgWithAck} <- dict:to_list(UAM)], State2)) end end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 0aa1b54263..c6f71fa638 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -572,9 +572,8 @@ publish_magic_marker_message(Q) -> ok = rabbit_disk_queue:publish(Q, ensure_binary_properties(Msg), false). fetch_ack_magic_marker_message(Q) -> - {#basic_message { exchange_name = none, routing_key = internal, - is_persistent = true }, - false, AckTag, Length} = rabbit_disk_queue:fetch(Q), + {Msg, false, AckTag, Length} = rabbit_disk_queue:fetch(Q), + true = is_magic_marker_message(Msg), ok = rabbit_disk_queue:ack(Q, [AckTag]), {ok, Length}. |
