diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-11-03 12:11:34 +0000 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-11-03 12:11:34 +0000 |
| commit | 282b25837870c6addab1dc1867fd7cedd6d56b55 (patch) | |
| tree | 2a559c18d3d9db119c49d567b4e97173d2e69a2f | |
| parent | 7700cc6e4bf0251b3d7b4fc354200a496a46efd3 (diff) | |
| parent | a39933ca41b4fa3f48fc7c19506e7b64be8b2ad3 (diff) | |
| download | rabbitmq-server-git-282b25837870c6addab1dc1867fd7cedd6d56b55.tar.gz | |
Merge default into 19468
| -rw-r--r-- | packaging/RPMS/Fedora/Makefile | 31 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.logrotate | 2 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 30 | ||||
| -rw-r--r-- | packaging/debs/Debian/Makefile | 4 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/rabbitmq-server.logrotate | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 10 |
12 files changed, 134 insertions, 90 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 814c79f03c..33032f118f 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -4,32 +4,23 @@ VERSION=0.0.0 SOURCE_TARBALL_DIR=../../../dist TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) -RPM_VERSION=$(shell echo $(VERSION) | tr - _) -DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' --define 'debian 1' +DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1' rpms: clean server #Create proper environment for making rpms prepare: - mkdir -p $(TOP_DIR)/BUILD - mkdir -p $(TOP_DIR)/SOURCES - mkdir -p $(TOP_DIR)/SPECS - mkdir -p $(TOP_DIR)/SRPMS - mkdir -p $(TOP_DIR)/RPMS - mkdir -p $(TOP_DIR)/tmp - cp $(TOP_DIR)/$(TARBALL) $(TOP_DIR)/SOURCES - cp $(TOP_DIR)/rabbitmq-server.spec $(TOP_DIR)/SPECS - cp $(TOP_DIR)/init.d $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmqctl_wrapper $(TOP_DIR)/BUILD - cp $(TOP_DIR)/rabbitmq-server.logrotate $(TOP_DIR)/BUILD + mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp + cp $(TOP_DIR)/$(TARBALL) SOURCES + cp rabbitmq-server.spec SPECS + sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec + + cp init.d SOURCES/rabbitmq-server.init + cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper + cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba $(TOP_DIR)/SPECS/rabbitmq-server.spec $(DEFINES) --target noarch + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target noarch clean: - rm -rf $(TOP_DIR)/SOURCES/ - rm -rf $(TOP_DIR)/SPECS/ - rm -rf $(TOP_DIR)/RPMS/ - rm -rf $(TOP_DIR)/SRPMS/ - rm -rf $(TOP_DIR)/BUILD/ - rm -rf $(TOP_DIR)/tmp/ + rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.logrotate b/packaging/RPMS/Fedora/rabbitmq-server.logrotate index 64cd01a185..ab87e4a5c6 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.logrotate +++ b/packaging/RPMS/Fedora/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /sbin/service rabbitmq-server rotate-logs endscript -}
\ No newline at end of file +} diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 43837ba34b..214f6918fd 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,9 +1,12 @@ Name: rabbitmq-server -Version: %{rpm_version} +Version: %%VERSION%% Release: 1 License: MPLv1.1 Group: Development/Libraries -Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz +Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz +Source1: rabbitmq-server.init +Source2: rabbitmq-server.wrapper +Source3: rabbitmq-server.logrotate URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. %if 0%{?debian} @@ -12,7 +15,7 @@ BuildRequires: python, python-json %endif Requires: erlang, logrotate Packager: Hubert Plociniczak <hubert@lshift.net> -BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root Summary: The RabbitMQ server Requires(post): chkconfig Requires(pre): chkconfig initscripts @@ -22,10 +25,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _mandir /usr/share/man -%define _sbindir /usr/sbin -%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") -%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version} + +%define _erllibdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") +%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} + %pre if [ $1 -gt 1 ]; then @@ -35,7 +38,7 @@ if [ $1 -gt 1 ]; then fi %prep -%setup -n %{name}-%{main_version} +%setup -n %{name}-%{version} %build make @@ -46,24 +49,23 @@ rm -rf %{buildroot} make install TARGET_DIR=%{_maindir} \ SBIN_DIR=%{buildroot}%{_sbindir} \ MAN_DIR=%{buildroot}%{_mandir} - VERSION=%{main_version} + VERSION=%{version} mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia mkdir -p %{buildroot}/var/log/rabbitmq mkdir -p %{buildroot}/etc/rc.d/init.d/ #Copy all necessary lib files etc. -cp ../init.d %{buildroot}/etc/rc.d/init.d/rabbitmq-server +install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server mv %{buildroot}/usr/sbin/rabbitmqctl %{buildroot}/usr/sbin/rabbitmqctl_real -cp ../rabbitmqctl_wrapper %{buildroot}/usr/sbin/rabbitmqctl -chmod 0755 %{buildroot}/usr/sbin/rabbitmqctl +install -m 0755 %SOURCE2 %{buildroot}/usr/sbin/rabbitmqctl cp %{buildroot}%{_mandir}/man1/rabbitmqctl.1.gz %{buildroot}%{_mandir}/man1/rabbitmqctl_real.1.gz mkdir -p %{buildroot}/etc/logrotate.d -cp ../rabbitmq-server.logrotate %{buildroot}/etc/logrotate.d/rabbitmq-server +install %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server %post # create rabbitmq group @@ -95,7 +97,7 @@ fi %files %defattr(-,root,root,-) -%{_libdir}/rabbitmq_server-%{main_version}/ +%{_erllibdir}/rabbitmq_server-%{version}/ %{_mandir}/man1/rabbitmq-multi.1.gz %{_mandir}/man1/rabbitmq-server.1.gz %{_mandir}/man1/rabbitmqctl.1.gz diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index dd74c31ea1..3e74cb5231 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -1,5 +1,6 @@ TARBALL_DIR=../../../dist TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz)) +DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') UNPACKED_DIR=rabbitmq-server-$(VERSION) PACKAGENAME=rabbitmq-server @@ -16,7 +17,8 @@ all: package: clean make -C ../.. check_tools - tar -zxvf $(TARBALL_DIR)/$(TARBALL) + cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL) + tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 675e15f490..749791a4bd 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> -Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python, python-json +Build-Depends: cdbs, debhelper (>= 5), erlang-nox, erlang-dev, python-json Standards-Version: 3.7.2 Package: rabbitmq-server diff --git a/packaging/debs/Debian/debian/rabbitmq-server.logrotate b/packaging/debs/Debian/debian/rabbitmq-server.logrotate index 247635d19a..bfd6b8da0b 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.logrotate +++ b/packaging/debs/Debian/debian/rabbitmq-server.logrotate @@ -9,4 +9,4 @@ postrotate /etc/init.d/rabbitmq-server rotate-logs endscript -}
\ No newline at end of file +} diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bd64f1e48d..7b2f801a31 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -295,25 +295,23 @@ ack(QPid, Txn, MsgIds, ChPid) -> commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( - fun (QPid) -> - rabbit_misc:with_exit_handler( - %% we don't care if the queue process has terminated - %% in the meantime - fun () -> ok end, - fun () -> gen_server:call(QPid, {notify_down, ChPid}, - Timeout) end) - end, + %% we don't care if the queue process has terminated in the + %% meantime + fun (_) -> ok end, + fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). binding_forcibly_removed(BindingSpec, QueueName) -> @@ -388,10 +386,13 @@ pseudo_queue(QueueName, Pid) -> binding_specs = [], pid = Pid}. -safe_pmap_ok(F, L) -> +safe_pmap_ok(H, F, L) -> case [R || R <- rabbit_misc:upmap( fun (V) -> - try F(V) + try + rabbit_misc:with_exit_handler( + fun () -> H(V) end, + fun () -> F(V) end) catch Class:Reason -> {Class, Reason} end end, L), @@ -399,4 +400,3 @@ safe_pmap_ok(F, L) -> [] -> ok; Errors -> {error, Errors} end. - diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 33fe047c39..b4e0fbab37 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -585,29 +585,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, - arguments = Arguments}, - _, State = #ch{ virtual_host = VHostPath }) -> - %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case rabbit_amqqueue:add_binding(QueueName, ExchangeName, - ActualRoutingKey, Arguments) of - {error, queue_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, #'queue.bind_ok'{}) - end; + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, + NoWait, State); + +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin, + QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, + false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -655,6 +644,36 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + %% FIXME: connection exception (!) on failure?? + %% (see rule named "failure" in spec-XML) + %% FIXME: don't allow binding to internal exchanges - + %% including the one named "" ! + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, + State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of + {error, queue_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, binding_not_found} -> + rabbit_misc:protocol_error( + not_found, "no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); + {error, durability_settings_incompatible} -> + rabbit_misc:protocol_error( + not_allowed, "durability settings of ~s incompatible with ~s", + [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + {ok, _BindingCount} -> + return_ok(State, NoWait, ReturnMethod) + end. + publish(Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> Handled = deliver(QPids, Mandatory, Immediate, TxnKey, @@ -736,7 +755,8 @@ internal_commit(State = #ch{transaction_id = TxnKey, case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of ok -> new_tx(State); - {error, Errors} -> exit({commit_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "commit failed: ~w", [Errors]) end. internal_rollback(State = #ch{transaction_id = TxnKey, @@ -751,7 +771,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> exit({rollback_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "rollback failed: ~w", [Errors]) end. fold_per_queue(F, Acc0, UAQ) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3e4ed8f36f..89648f4f1e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -34,7 +34,7 @@ -export([dirty_read/1]). -export([r/3, r/2, rs/1]). -export([enable_cover/0, report_cover/0]). --export([with_exit_handler/2]). +-export([throw_on_error/2, with_exit_handler/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). @@ -76,6 +76,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). +-spec(throw_on_error/2 :: + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). @@ -197,6 +199,13 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +throw_on_error(E, Thunk) -> + case Thunk() of + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res + end. + with_exit_handler(Handler, Thunk) -> try Thunk() diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7e68b3eddd..ce26c11a0b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -166,14 +166,27 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +peername(Sock) -> + try + {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + AddressS = inet_parse:ntoa(Address), + {AddressS, Port} + catch + Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Ex]), + rabbit_log:info("closing TCP connection ~p", [self()]), + exit(normal) + end. + start_connection(Parent, Deb, ClientSock) -> - ProfilingValue = setup_profiling(), process_flag(trap_exit, true), - {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock), - PeerAddressS = inet_parse:ntoa(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + {PeerAddressS, PeerPort} = peername(ClientSock), + ProfilingValue = setup_profiling(), try + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), mainloop(Parent, Deb, switch_callback( @@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1), + Ref = inet_op(fun () -> prim_inet:async_recv( + OldState#v1.sock, Length, -1) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>), + ok = inet_op(fun () -> gen_tcp:send( + Sock, <<"AMQP",1,1, + ?PROTOCOL_VERSION_MAJOR, + ?PROTOCOL_VERSION_MINOR>>) end), throw({bad_header, Other}); handle_input(Callback, Data, _State) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 41a8d64cb3..a233764766 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -150,11 +150,9 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> fun (QPid, {Routed, Handled}) -> case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, Txn, Message, QPid) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n", - [QPid, Reason]), - {Routed, Handled} + true -> {true, [QPid | Handled]}; + false -> {true, Handled}; + {'EXIT', _Reason} -> {Routed, Handled} end end, {false, []}, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index dee24cd99b..a2688625be 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -131,12 +131,16 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> Channel, Content, FrameMax), [MethodFrame | ContentFrames]. +tcp_send(Sock, Data) -> + rabbit_misc:throw_on_error(inet_error, + fun () -> gen_tcp:send(Sock, Data) end). + internal_send_command(Sock, Channel, MethodRecord) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> - ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from |
