summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHubert Plociniczak <hubert@lshift.net>2008-10-30 17:04:20 +0000
committerHubert Plociniczak <hubert@lshift.net>2008-10-30 17:04:20 +0000
commita39933ca41b4fa3f48fc7c19506e7b64be8b2ad3 (patch)
tree80d2dc7959f1cb52c312df988bd545104a1cabe3 /src
parenteae657af80af5bb98bacb118346c3a68b6aa8e0c (diff)
parent44295f44d75ea1fabfb9cbd1693d9ebb598f9866 (diff)
downloadrabbitmq-server-git-a39933ca41b4fa3f48fc7c19506e7b64be8b2ad3.tar.gz
Merge two default branches into single one
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl65
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_reader.erl31
-rw-r--r--src/rabbit_writer.erl10
4 files changed, 83 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ef3a9f0ebf..0544d32e6a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -572,29 +572,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
nowait = NoWait,
- arguments = Arguments},
- _, State = #ch{ virtual_host = VHostPath }) ->
- %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML)
- %% FIXME: don't allow binding to internal exchanges - including the one named "" !
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
- State),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- case rabbit_amqqueue:add_binding(QueueName, ExchangeName,
- ActualRoutingKey, Arguments) of
- {error, queue_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
- {error, exchange_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
- {error, durability_settings_incompatible} ->
- rabbit_misc:protocol_error(
- not_allowed, "durability settings of ~s incompatible with ~s",
- [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
- {ok, _BindingCount} ->
- return_ok(State, NoWait, #'queue.bind_ok'{})
- end;
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin,
+ QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
+ NoWait, State);
+
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin,
+ QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
+ false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@@ -636,6 +625,36 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ %% FIXME: connection exception (!) on failure??
+ %% (see rule named "failure" in spec-XML)
+ %% FIXME: don't allow binding to internal exchanges -
+ %% including the one named "" !
+ QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
+ State),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of
+ {error, queue_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
+ {error, exchange_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ {error, binding_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no binding ~s between ~s and ~s",
+ [RoutingKey, rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)]);
+ {error, durability_settings_incompatible} ->
+ rabbit_misc:protocol_error(
+ not_allowed, "durability settings of ~s incompatible with ~s",
+ [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
+ {ok, _BindingCount} ->
+ return_ok(State, NoWait, ReturnMethod)
+ end.
+
publish(Mandatory, Immediate, Message, QPids,
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3e4ed8f36f..89648f4f1e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -34,7 +34,7 @@
-export([dirty_read/1]).
-export([r/3, r/2, rs/1]).
-export([enable_cover/0, report_cover/0]).
--export([with_exit_handler/2]).
+-export([throw_on_error/2, with_exit_handler/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
@@ -76,6 +76,8 @@
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
+-spec(throw_on_error/2 ::
+ (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
@@ -197,6 +199,13 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
end,
Mod]).
+throw_on_error(E, Thunk) ->
+ case Thunk() of
+ {error, Reason} -> throw({E, Reason});
+ {ok, Res} -> Res;
+ Res -> Res
+ end.
+
with_exit_handler(Handler, Thunk) ->
try
Thunk()
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7e68b3eddd..ce26c11a0b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -166,14 +166,27 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
+
+peername(Sock) ->
+ try
+ {Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
+ AddressS = inet_parse:ntoa(Address),
+ {AddressS, Port}
+ catch
+ Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
+ [self(), Ex]),
+ rabbit_log:info("closing TCP connection ~p", [self()]),
+ exit(normal)
+ end.
+
start_connection(Parent, Deb, ClientSock) ->
- ProfilingValue = setup_profiling(),
process_flag(trap_exit, true),
- {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock),
- PeerAddressS = inet_parse:ntoa(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ {PeerAddressS, PeerPort} = peername(ClientSock),
+ ProfilingValue = setup_profiling(),
try
+ rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
+ [self(), PeerAddressS, PeerPort]),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
mainloop(Parent, Deb, switch_callback(
@@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end.
switch_callback(OldState, NewCallback, Length) ->
- {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1),
+ Ref = inet_op(fun () -> prim_inet:async_recv(
+ OldState#v1.sock, Length, -1) end),
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
@@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
end;
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>),
+ ok = inet_op(fun () -> gen_tcp:send(
+ Sock, <<"AMQP",1,1,
+ ?PROTOCOL_VERSION_MAJOR,
+ ?PROTOCOL_VERSION_MINOR>>) end),
throw({bad_header, Other});
handle_input(Callback, Data, _State) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 0f6bca91bc..2c7fa2ab90 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -127,12 +127,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
Channel, Content, FrameMax),
[MethodFrame | ContentFrames].
+tcp_send(Sock, Data) ->
+ rabbit_misc:throw_on_error(inet_error,
+ fun () -> gen_tcp:send(Sock, Data) end).
+
internal_send_command(Sock, Channel, MethodRecord) ->
- ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)).
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
- ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)).
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
+ Content, FrameMax)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from