diff options
| author | Tony Garnock-Jones <tonyg@lshift.net> | 2008-10-30 14:10:55 +0000 |
|---|---|---|
| committer | Tony Garnock-Jones <tonyg@lshift.net> | 2008-10-30 14:10:55 +0000 |
| commit | 44295f44d75ea1fabfb9cbd1693d9ebb598f9866 (patch) | |
| tree | 9ec957eebcfb60871f68247aee754e2e8bcb4a6f /src | |
| parent | 50a0752fb10922db34b17d3d1f499ca80c95329b (diff) | |
| parent | 6a5a9f667bb30b98c3b834c2882e8d9c51c0260e (diff) | |
| download | rabbitmq-server-git-44295f44d75ea1fabfb9cbd1693d9ebb598f9866.tar.gz | |
merge bug19250 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 65 |
1 files changed, 42 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a9278898ea..7cec5f8654 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -572,29 +572,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, - arguments = Arguments}, - _, State = #ch{ virtual_host = VHostPath }) -> - %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case rabbit_amqqueue:add_binding(QueueName, ExchangeName, - ActualRoutingKey, Arguments) of - {error, queue_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, #'queue.bind_ok'{}) - end; + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, + NoWait, State); + +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, + false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -636,6 +625,36 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + %% FIXME: connection exception (!) on failure?? + %% (see rule named "failure" in spec-XML) + %% FIXME: don't allow binding to internal exchanges - + %% including the one named "" ! + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, + State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of + {error, queue_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, binding_not_found} -> + rabbit_misc:protocol_error( + not_found, "no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); + {error, durability_settings_incompatible} -> + rabbit_misc:protocol_error( + not_allowed, "durability settings of ~s incompatible with ~s", + [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + {ok, _BindingCount} -> + return_ok(State, NoWait, ReturnMethod) + end. + publish(Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> Handled = deliver(QPids, Mandatory, Immediate, TxnKey, |
