diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-10-16 13:03:49 +0100 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-10-16 13:03:49 +0100 |
| commit | dbf935be377e50cad62a4cea2c68123e3eb566aa (patch) | |
| tree | aad7968df1ac50369fba94dafc1e30bd25de09bd | |
| parent | 86e2cc75e5f3e282f70b3a30cf319b63e0361057 (diff) | |
| parent | 07f212e2942af3f5eb50543e0e36ccda026ac383 (diff) | |
| download | rabbitmq-server-git-dbf935be377e50cad62a4cea2c68123e3eb566aa.tar.gz | |
Merged default into bug19250
| -rw-r--r-- | packaging/RPMS/Fedora/Makefile | 2 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 37 |
6 files changed, 87 insertions, 73 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 6cc3579bab..814c79f03c 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -5,7 +5,7 @@ SOURCE_TARBALL_DIR=../../../dist TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) RPM_VERSION=$(shell echo $(VERSION) | tr - _) -DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' +DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' --define 'debian 1' rpms: clean server diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 08694c096c..43837ba34b 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -6,6 +6,10 @@ Group: Development/Libraries Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz URL: http://www.rabbitmq.com/ Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. +%if 0%{?debian} +%else +BuildRequires: python, python-json +%endif Requires: erlang, logrotate Packager: Hubert Plociniczak <hubert@lshift.net> BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root @@ -18,13 +22,11 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. - %define _mandir /usr/share/man %define _sbindir /usr/sbin %define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().") %define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version} - %pre if [ $1 -gt 1 ]; then #Upgrade - stop and remove previous instance of rabbitmq-server init.d script diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index bc691bc7b6..675e15f490 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> -Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python +Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python, python-json Standards-Version: 3.7.2 Package: rabbitmq-server diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7ce350d86e..bd64f1e48d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,12 +28,12 @@ -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4, - commit/2, rollback/2]). + stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_down/2]). +-export([notify_sent/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). -import(mnesia). @@ -44,6 +44,8 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-define(CALL_TIMEOUT, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -53,6 +55,9 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(bind_res() :: {'ok', non_neg_integer()} | {'error', 'queue_not_found' | 'exchange_not_found'}). +-type(ok_or_errors() :: + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). + -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> @@ -81,9 +86,9 @@ -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit/2 :: (pid(), txn()) -> 'ok'). --spec(rollback/2 :: (pid(), txn()) -> 'ok'). --spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> @@ -287,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). -commit(QPid, Txn) -> - gen_server:call(QPid, {commit, Txn}). - -rollback(QPid, Txn) -> - gen_server:cast(QPid, {rollback, Txn}). - -notify_down(#amqqueue{ pid = QPid }, ChPid) -> - gen_server:call(QPid, {notify_down, ChPid}). +commit_all(QPids, Txn) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + QPids). + +rollback_all(QPids, Txn) -> + safe_pmap_ok( + fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + QPids). + +notify_down_all(QPids, ChPid) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> + rabbit_misc:with_exit_handler( + %% we don't care if the queue process has terminated + %% in the meantime + fun () -> ok end, + fun () -> gen_server:call(QPid, {notify_down, ChPid}, + Timeout) end) + end, + QPids). binding_forcibly_removed(BindingSpec, QueueName) -> rabbit_misc:execute_mnesia_transaction( @@ -367,3 +387,16 @@ pseudo_queue(QueueName, Pid) -> arguments = [], binding_specs = [], pid = Pid}. + +safe_pmap_ok(F, L) -> + case [R || R <- rabbit_misc:upmap( + fun (V) -> + try F(V) + catch Class:Reason -> {Class, Reason} + end + end, L), + R =/= ok] of + [] -> ok; + Errors -> {error, Errors} + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4cce60e5a4..0bad01f817 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -731,21 +731,6 @@ ack(ProxyPid, TxnKey, UAQ) -> make_tx_id() -> rabbit_misc:guid(). -safe_pmap_set_ok(F, S) -> - case lists:filter(fun (R) -> R =/= ok end, - rabbit_misc:upmap( - fun (V) -> - try F(V) - catch Class:Reason -> {Class, Reason} - end - end, sets:to_list(S))) of - [] -> ok; - Errors -> {error, Errors} - end. - -notify_participants(F, TxnKey, Participants) -> - safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants). - new_tx(State) -> State#ch{transaction_id = make_tx_id(), tx_participants = sets:new(), @@ -753,8 +738,8 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> - case notify_participants(fun rabbit_amqqueue:commit/2, - TxnKey, Participants) of + case rabbit_amqqueue:commit_all(sets:to_list(Participants), + TxnKey) of ok -> new_tx(State); {error, Errors} -> exit({commit_failed, Errors}) end. @@ -767,8 +752,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case notify_participants(fun rabbit_amqqueue:rollback/2, - TxnKey, Participants) of + case rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> exit({rollback_failed, Errors}) @@ -791,23 +776,18 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - safe_pmap_set_ok( - fun (QueueName) -> - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:notify_down(Q, ProxyPid) - end) of - ok -> - ok; - {error, not_found} -> - %% queue has been deleted in the meantime - ok - end - end, - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)). + rabbit_amqqueue:notify_down_all( + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end], + ProxyPid). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bfd1ea72ff..7e68b3eddd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -94,10 +94,18 @@ %% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* -%% channel exit -> -%% if abnormal exit then log error -%% if last channel to exit then send connection.close_ok, start -%% terminate_connection timer, *closing* +%% channel exit with hard error +%% -> log error, wait for channels to terminate forcefully, start +%% terminate_connection timer, send close, *closed* +%% channel exit with soft error +%% -> log error, start terminate_channel timer, mark channel as +%% closing +%% if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* +%% else *closing* +%% channel exits normally +%% -> if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* %% receive connection.close_ok -> self() ! terminate_connection, @@ -291,24 +299,13 @@ terminate_channel(Channel, Ref, State) -> end, State. -handle_dependent_exit(Pid, Reason, - State = #v1{connection_state = closing}) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> - case Reason of - normal -> ok; - _ -> log_channel_error(closing, Channel, Reason) - end, - maybe_close(State) - end; handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), - State; + maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> handle_exception(State, Channel, Reason) + Channel -> maybe_close(handle_exception(State, Channel, Reason)) end. channel_cleanup(Pid) -> @@ -365,13 +362,15 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State) -> +maybe_close(State = #v1{connection_state = closing}) -> case all_channels() of [] -> ok = send_on_channel0( State#v1.sock, #'connection.close_ok'{}), close_connection(State); _ -> State - end. + end; +maybe_close(State) -> + State. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> |
