diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:52:22 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:52:22 +0000 |
| commit | 17ed6872d4f94ba3077f7850d58ed334c8300867 (patch) | |
| tree | 8d40c4fcbe82393f57291b2830d554a9492b2485 /src | |
| parent | b4ce6c28f3f3f3569435953865f848d8a38ff4c9 (diff) | |
| download | rabbitmq-server-git-17ed6872d4f94ba3077f7850d58ed334c8300867.tar.gz | |
Attempt at error checking the basic.consume.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 53 |
1 files changed, 31 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dac02b21ba..91dd9d267f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -767,9 +767,16 @@ handle_method(#'basic.consume'{queue = QueueNameBin, "amq.ctag"); Other -> Other end, - {noreply, basic_consume( - QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, - ExclusiveConsume, Args, NoWait, State)}; + case basic_consume( + QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, State) of + {ok, State1} -> + {noreply, State1}; + {error, exclusive_consume_unavailable} -> + rabbit_misc:protocol_error( + access_refused, "~s in exclusive use", + [rabbit_misc:rs(QueueName)]) + end; {ok, _} -> %% Attempted reuse of consumer tag. rabbit_misc:protocol_error( @@ -1185,14 +1192,12 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, State1 = monitor_delivering_queue( NoAck, QPid, QName, State#ch{consumer_mapping = CM1}), - case NoWait of - true -> consumer_monitor(ActualConsumerTag, State1); - false -> State1 - end; - {{error, exclusive_consume_unavailable}, _Q} -> - rabbit_misc:protocol_error( - access_refused, "~s in exclusive use", - [rabbit_misc:rs(QueueName)]) + {ok, case NoWait of + true -> consumer_monitor(ActualConsumerTag, State1); + false -> State1 + end}; + {{error, exclusive_consume_unavailable} = E, _Q} -> + E end. consumer_monitor(ConsumerTag, @@ -1252,21 +1257,25 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, QName = dict:fetch(QPid, QNames), case queue_down_consumer_action(QPid, QName, CTag, CMap) of remove -> - ok = send(#'basic.cancel'{consumer_tag = CTag, - nowait = true}, - StateN), - rabbit_event:notify( - consumer_deleted, [{consumer_tag, CTag}, - {channel, self()}, - {queue, QName}]), - StateN#ch{consumer_mapping = dict:erase(CTag, CMap)}; + cancel_consumer(CTag, QName, StateN); {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> - basic_consume( - QName, NoAck, ConsumerPrefetch, CTag, - Exclusive, Args, true, StateN) + case basic_consume( + QName, NoAck, ConsumerPrefetch, CTag, + Exclusive, Args, true, StateN) of + {ok, StateN1} -> StateN1; + {error, _} -> cancel_consumer(CTag, QName, StateN) + end end end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags). +cancel_consumer(CTag, QName, State = #ch{consumer_mapping = CMap}) -> + ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, State), + rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag}, + {channel, self()}, + {queue, QName}]), + State#ch{consumer_mapping = dict:erase(CTag, CMap)}. + queue_down_consumer_action(QPid, QName, CTag, CMap) -> {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of |
