diff options
| -rwxr-xr-x | scripts/rabbitmq-server | 5 | ||||
| -rw-r--r-- | scripts/rabbitmq-server.bat | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 60 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 11 |
7 files changed, 97 insertions, 75 deletions
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 80289d8ec3..b930c8edd8 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -28,7 +28,10 @@ [ "x" = "x$NODE_IP_ADDRESS" ] && NODE_IP_ADDRESS=0.0.0.0 [ "x" = "x$NODE_PORT" ] && NODE_PORT=5672 -ERL_ARGS="+K true +A30 -kernel inet_default_listen_options [{sndbuf,16384},{recbuf,4096}]" +ERL_ARGS="+K true +A30 \ +-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ +-kernel inet_default_connect_options [{nodelay,true}]" + CLUSTER_CONFIG_FILE=/etc/default/rabbitmq_cluster.config [ "x" = "x$LOG_BASE" ] && LOG_BASE=/var/log/rabbitmq diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 42c09fac74..f08027d237 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -99,7 +99,8 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia -s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{sndbuf, 16384}, {recbuf, 4096}]" ^
+-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
+-kernel inet_default_connect_options "[{nodelay, true}]" ^
-rabbit tcp_listeners "[{\"%NODE_IP_ADDRESS%\", %NODE_PORT%}]" ^
-kernel error_logger {file,\""%LOG_BASE%/%NODENAME%.log"\"} ^
-sasl errlog_type error ^
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a0d8d3082b..c7b0fd5a49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,11 +28,11 @@ -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4, - commit/2, rollback/2]). + stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_down/2]). +-export([notify_sent/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). -import(mnesia). @@ -43,6 +43,8 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-define(CALL_TIMEOUT, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -50,6 +52,8 @@ -type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). +-type(ok_or_errors() :: + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> @@ -72,9 +76,9 @@ -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit/2 :: (pid(), txn()) -> 'ok'). --spec(rollback/2 :: (pid(), txn()) -> 'ok'). --spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -210,14 +214,29 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). -commit(QPid, Txn) -> - gen_server:call(QPid, {commit, Txn}). - -rollback(QPid, Txn) -> - gen_server:cast(QPid, {rollback, Txn}). - -notify_down(#amqqueue{ pid = QPid }, ChPid) -> - gen_server:call(QPid, {notify_down, ChPid}). +commit_all(QPids, Txn) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + QPids). + +rollback_all(QPids, Txn) -> + safe_pmap_ok( + fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + QPids). + +notify_down_all(QPids, ChPid) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> + rabbit_misc:with_exit_handler( + %% we don't care if the queue process has terminated + %% in the meantime + fun () -> ok end, + fun () -> gen_server:call(QPid, {notify_down, ChPid}, + Timeout) end) + end, + QPids). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). @@ -270,3 +289,16 @@ pseudo_queue(QueueName, Pid) -> auto_delete = false, arguments = [], pid = Pid}. + +safe_pmap_ok(F, L) -> + case [R || R <- rabbit_misc:upmap( + fun (V) -> + try F(V) + catch Class:Reason -> {Class, Reason} + end + end, L), + R =/= ok] of + [] -> ok; + Errors -> {error, Errors} + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ddd0ecf484..c432ffd382 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -707,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) -> make_tx_id() -> rabbit_misc:guid(). -safe_pmap_set_ok(F, S) -> - case lists:filter(fun (R) -> R =/= ok end, - rabbit_misc:upmap( - fun (V) -> - try F(V) - catch Class:Reason -> {Class, Reason} - end - end, sets:to_list(S))) of - [] -> ok; - Errors -> {error, Errors} - end. - -notify_participants(F, TxnKey, Participants) -> - safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants). - new_tx(State) -> State#ch{transaction_id = make_tx_id(), tx_participants = sets:new(), @@ -729,8 +714,8 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> - case notify_participants(fun rabbit_amqqueue:commit/2, - TxnKey, Participants) of + case rabbit_amqqueue:commit_all(sets:to_list(Participants), + TxnKey) of ok -> new_tx(State); {error, Errors} -> exit({commit_failed, Errors}) end. @@ -743,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case notify_participants(fun rabbit_amqqueue:rollback/2, - TxnKey, Participants) of + case rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> exit({rollback_failed, Errors}) @@ -767,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - safe_pmap_set_ok( - fun (QueueName) -> - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:notify_down(Q, ProxyPid) - end) of - ok -> - ok; - {error, not_found} -> - %% queue has been deleted in the meantime - ok - end - end, - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)). + rabbit_amqqueue:notify_down_all( + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end], + ProxyPid). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 5b11836ad9..2c5aa3085c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -51,7 +51,6 @@ not_found() | {'error', 'unroutable' | 'not_delivered'}). -type(bind_res() :: 'ok' | {'error', 'queue_not_found' | 'exchange_not_found'}). - -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1d11cbaaea..7e68b3eddd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -59,6 +59,7 @@ %% all states, unless specified otherwise: %% socket error -> *exit* %% socket close -> *throw* +%% writer send failure -> *throw* %% forced termination -> *exit* %% handshake_timeout -> *throw* %% pre-init: @@ -93,10 +94,18 @@ %% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* -%% channel exit -> -%% if abnormal exit then log error -%% if last channel to exit then send connection.close_ok, start -%% terminate_connection timer, *closing* +%% channel exit with hard error +%% -> log error, wait for channels to terminate forcefully, start +%% terminate_connection timer, send close, *closed* +%% channel exit with soft error +%% -> log error, start terminate_channel timer, mark channel as +%% closing +%% if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* +%% else *closing* +%% channel exits normally +%% -> if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* %% receive connection.close_ok -> self() ! terminate_connection, @@ -230,6 +239,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); + {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + throw(E); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -288,24 +299,13 @@ terminate_channel(Channel, Ref, State) -> end, State. -handle_dependent_exit(Pid, Reason, - State = #v1{connection_state = closing}) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> - case Reason of - normal -> ok; - _ -> log_channel_error(closing, Channel, Reason) - end, - maybe_close(State) - end; handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), - State; + maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> handle_exception(State, Channel, Reason) + Channel -> maybe_close(handle_exception(State, Channel, Reason)) end. channel_cleanup(Pid) -> @@ -362,13 +362,15 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State) -> +maybe_close(State = #v1{connection_state = closing}) -> case all_channels() of [] -> ok = send_on_channel0( State#v1.sock, #'connection.close_ok'{}), close_connection(State); _ -> State - end. + end; +maybe_close(State) -> + State. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index eda871ecc7..0f6bca91bc 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -153,10 +153,15 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. internal_send_command_async(Sock, Channel, MethodRecord) -> - true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord)), + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), ok. internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> - true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)), ok. + +port_cmd(Sock, Data) -> + try erlang:port_command(Sock, Data) + catch error:Error -> exit({writer, send_failed, Error}) + end. |
