diff options
| author | Hubert Plociniczak <hubert@lshift.net> | 2008-10-30 17:04:20 +0000 |
|---|---|---|
| committer | Hubert Plociniczak <hubert@lshift.net> | 2008-10-30 17:04:20 +0000 |
| commit | a39933ca41b4fa3f48fc7c19506e7b64be8b2ad3 (patch) | |
| tree | 80d2dc7959f1cb52c312df988bd545104a1cabe3 | |
| parent | eae657af80af5bb98bacb118346c3a68b6aa8e0c (diff) | |
| parent | 44295f44d75ea1fabfb9cbd1693d9ebb598f9866 (diff) | |
| download | rabbitmq-server-git-a39933ca41b4fa3f48fc7c19506e7b64be8b2ad3.tar.gz | |
Merge two default branches into single one
| -rw-r--r-- | src/rabbit_channel.erl | 65 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 10 |
4 files changed, 83 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ef3a9f0ebf..0544d32e6a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -572,29 +572,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, - arguments = Arguments}, - _, State = #ch{ virtual_host = VHostPath }) -> - %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case rabbit_amqqueue:add_binding(QueueName, ExchangeName, - ActualRoutingKey, Arguments) of - {error, queue_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, #'queue.bind_ok'{}) - end; + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, + NoWait, State); + +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, + false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -636,6 +625,36 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + %% FIXME: connection exception (!) on failure?? + %% (see rule named "failure" in spec-XML) + %% FIXME: don't allow binding to internal exchanges - + %% including the one named "" ! + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, + State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of + {error, queue_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, binding_not_found} -> + rabbit_misc:protocol_error( + not_found, "no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); + {error, durability_settings_incompatible} -> + rabbit_misc:protocol_error( + not_allowed, "durability settings of ~s incompatible with ~s", + [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + {ok, _BindingCount} -> + return_ok(State, NoWait, ReturnMethod) + end. + publish(Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> Handled = deliver(QPids, Mandatory, Immediate, TxnKey, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3e4ed8f36f..89648f4f1e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -34,7 +34,7 @@ -export([dirty_read/1]). -export([r/3, r/2, rs/1]). -export([enable_cover/0, report_cover/0]). --export([with_exit_handler/2]). +-export([throw_on_error/2, with_exit_handler/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). @@ -76,6 +76,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). +-spec(throw_on_error/2 :: + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). @@ -197,6 +199,13 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +throw_on_error(E, Thunk) -> + case Thunk() of + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res + end. + with_exit_handler(Handler, Thunk) -> try Thunk() diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7e68b3eddd..ce26c11a0b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -166,14 +166,27 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +peername(Sock) -> + try + {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + AddressS = inet_parse:ntoa(Address), + {AddressS, Port} + catch + Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Ex]), + rabbit_log:info("closing TCP connection ~p", [self()]), + exit(normal) + end. + start_connection(Parent, Deb, ClientSock) -> - ProfilingValue = setup_profiling(), process_flag(trap_exit, true), - {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock), - PeerAddressS = inet_parse:ntoa(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + {PeerAddressS, PeerPort} = peername(ClientSock), + ProfilingValue = setup_profiling(), try + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), mainloop(Parent, Deb, switch_callback( @@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1), + Ref = inet_op(fun () -> prim_inet:async_recv( + OldState#v1.sock, Length, -1) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>), + ok = inet_op(fun () -> gen_tcp:send( + Sock, <<"AMQP",1,1, + ?PROTOCOL_VERSION_MAJOR, + ?PROTOCOL_VERSION_MINOR>>) end), throw({bad_header, Other}); handle_input(Callback, Data, _State) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 0f6bca91bc..2c7fa2ab90 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -127,12 +127,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> Channel, Content, FrameMax), [MethodFrame | ContentFrames]. +tcp_send(Sock, Data) -> + rabbit_misc:throw_on_error(inet_error, + fun () -> gen_tcp:send(Sock, Data) end). + internal_send_command(Sock, Channel, MethodRecord) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from |
