diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:33:53 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:33:53 +0000 |
| commit | b4ce6c28f3f3f3569435953865f848d8a38ff4c9 (patch) | |
| tree | f888535f749e9e2ac505db5f14d4137d7042a6bd | |
| parent | f60c79ae1b13150e449a50c5a01bafa5b8ba074f (diff) | |
| download | rabbitmq-server-git-b4ce6c28f3f3f3569435953865f848d8a38ff4c9.tar.gz | |
Abstract the waiting into rabbit_amqqueue.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 15 |
2 files changed, 17 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 85d1f28311..4a7324a992 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, - assert_equivalence/5, + wait_for_recovery/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -90,6 +90,9 @@ -spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). +-spec(wait_for_recovery/2 :: + (pid(), name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | + rabbit_types:error('not_found')). -spec(assert_equivalence/5 :: (rabbit_types:amqqueue(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) @@ -378,6 +381,14 @@ with_or_die(Name, F) -> ({absent, Q}) -> rabbit_misc:absent(Q) end). +wait_for_recovery(OldQPid, QName) -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{pid = OldQPid}} -> timer:sleep(25), + wait_for_recovery(OldQPid, QName); + {ok, Q} -> {ok, Q}; + {error, not_found} -> {error, not_found} + end. + assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3e800cf197..dac02b21ba 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1270,16 +1270,11 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, queue_down_consumer_action(QPid, QName, CTag, CMap) -> {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of - {bool, true} -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> timer:sleep(25), - queue_down_consumer_action( - QPid, QName, CTag, CMap); - {ok, _Q} -> {recover, ConsumeSpec}; - {error, not_found} -> remove - end; - _ -> - remove + {bool, true} -> case rabbit_amqqueue:wait_for_recovery(QPid, QName) of + {ok, _Q} -> {recover, ConsumeSpec}; + {error, not_found} -> remove + end; + _ -> remove end. handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> |
