summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-10-06 17:59:31 +0100
committerBen Hood <0x6e6562@gmail.com>2008-10-06 17:59:31 +0100
commit6fba52a764bbb2b705ae759a2b47b1cba3b319ac (patch)
tree79e9bc747d440971714028115d1068b60b245d10 /src
parentb8530a54f96d0302b5ef45f6a6113b91a89ffb76 (diff)
parent5e6c6e50c0d8491a45dc20a806acd5ecb67c7110 (diff)
downloadrabbitmq-server-git-6fba52a764bbb2b705ae759a2b47b1cba3b319ac.tar.gz
Merged default into 18776
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl60
-rw-r--r--src/rabbit_channel.erl52
-rw-r--r--src/rabbit_exchange.erl1
-rw-r--r--src/rabbit_reader.erl40
-rw-r--r--src/rabbit_writer.erl11
5 files changed, 91 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a0d8d3082b..c7b0fd5a49 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,11 +28,11 @@
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
- stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4,
- commit/2, rollback/2]).
+ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_down/2]).
+-export([notify_sent/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
-import(mnesia).
@@ -43,6 +43,8 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-define(CALL_TIMEOUT, 5000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -50,6 +52,8 @@
-type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
-type(qlen() :: {'ok', non_neg_integer()}).
-type(qfun(A) :: fun ((amqqueue()) -> A)).
+-type(ok_or_errors() ::
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
@@ -72,9 +76,9 @@
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit/2 :: (pid(), txn()) -> 'ok').
--spec(rollback/2 :: (pid(), txn()) -> 'ok').
--spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
@@ -210,14 +214,29 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
-commit(QPid, Txn) ->
- gen_server:call(QPid, {commit, Txn}).
-
-rollback(QPid, Txn) ->
- gen_server:cast(QPid, {rollback, Txn}).
-
-notify_down(#amqqueue{ pid = QPid }, ChPid) ->
- gen_server:call(QPid, {notify_down, ChPid}).
+commit_all(QPids, Txn) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
+ QPids).
+
+rollback_all(QPids, Txn) ->
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ QPids).
+
+notify_down_all(QPids, ChPid) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) ->
+ rabbit_misc:with_exit_handler(
+ %% we don't care if the queue process has terminated
+ %% in the meantime
+ fun () -> ok end,
+ fun () -> gen_server:call(QPid, {notify_down, ChPid},
+ Timeout) end)
+ end,
+ QPids).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server:call(QPid, {claim_queue, ReaderPid}).
@@ -270,3 +289,16 @@ pseudo_queue(QueueName, Pid) ->
auto_delete = false,
arguments = [],
pid = Pid}.
+
+safe_pmap_ok(F, L) ->
+ case [R || R <- rabbit_misc:upmap(
+ fun (V) ->
+ try F(V)
+ catch Class:Reason -> {Class, Reason}
+ end
+ end, L),
+ R =/= ok] of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ddd0ecf484..c432ffd382 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -707,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) ->
make_tx_id() -> rabbit_misc:guid().
-safe_pmap_set_ok(F, S) ->
- case lists:filter(fun (R) -> R =/= ok end,
- rabbit_misc:upmap(
- fun (V) ->
- try F(V)
- catch Class:Reason -> {Class, Reason}
- end
- end, sets:to_list(S))) of
- [] -> ok;
- Errors -> {error, Errors}
- end.
-
-notify_participants(F, TxnKey, Participants) ->
- safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants).
-
new_tx(State) ->
State#ch{transaction_id = make_tx_id(),
tx_participants = sets:new(),
@@ -729,8 +714,8 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
- case notify_participants(fun rabbit_amqqueue:commit/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:commit_all(sets:to_list(Participants),
+ TxnKey) of
ok -> new_tx(State);
{error, Errors} -> exit({commit_failed, Errors})
end.
@@ -743,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
- case notify_participants(fun rabbit_amqqueue:rollback/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
+ TxnKey) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> exit({rollback_failed, Errors})
@@ -767,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- safe_pmap_set_ok(
- fun (QueueName) ->
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:notify_down(Q, ProxyPid)
- end) of
- ok ->
- ok;
- {error, not_found} ->
- %% queue has been deleted in the meantime
- ok
- end
- end,
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)).
+ rabbit_amqqueue:notify_down_all(
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end],
+ ProxyPid).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 5b11836ad9..2c5aa3085c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -51,7 +51,6 @@
not_found() | {'error', 'unroutable' | 'not_delivered'}).
-type(bind_res() :: 'ok' |
{'error', 'queue_not_found' | 'exchange_not_found'}).
-
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1d11cbaaea..7e68b3eddd 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -59,6 +59,7 @@
%% all states, unless specified otherwise:
%% socket error -> *exit*
%% socket close -> *throw*
+%% writer send failure -> *throw*
%% forced termination -> *exit*
%% handshake_timeout -> *throw*
%% pre-init:
@@ -93,10 +94,18 @@
%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
-%% channel exit ->
-%% if abnormal exit then log error
-%% if last channel to exit then send connection.close_ok, start
-%% terminate_connection timer, *closing*
+%% channel exit with hard error
+%% -> log error, wait for channels to terminate forcefully, start
+%% terminate_connection timer, send close, *closed*
+%% channel exit with soft error
+%% -> log error, start terminate_channel timer, mark channel as
+%% closing
+%% if last channel to exit then send connection.close_ok,
+%% start terminate_connection timer, *closed*
+%% else *closing*
+%% channel exits normally
+%% -> if last channel to exit then send connection.close_ok,
+%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
%% receive connection.close_ok -> self() ! terminate_connection,
@@ -230,6 +239,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
+ {'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
+ throw(E);
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
{terminate_channel, Channel, Ref1} ->
@@ -288,24 +299,13 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
-handle_dependent_exit(Pid, Reason,
- State = #v1{connection_state = closing}) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel ->
- case Reason of
- normal -> ok;
- _ -> log_channel_error(closing, Channel, Reason)
- end,
- maybe_close(State)
- end;
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
- State;
+ maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> handle_exception(State, Channel, Reason)
+ Channel -> maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(Pid) ->
@@ -362,13 +362,15 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State) ->
+maybe_close(State = #v1{connection_state = closing}) ->
case all_channels() of
[] -> ok = send_on_channel0(
State#v1.sock, #'connection.close_ok'{}),
close_connection(State);
_ -> State
- end.
+ end;
+maybe_close(State) ->
+ State.
handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index eda871ecc7..0f6bca91bc 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -153,10 +153,15 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
internal_send_command_async(Sock, Channel, MethodRecord) ->
- true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord)),
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
ok.
internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
- true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)),
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
+ Content, FrameMax)),
ok.
+
+port_cmd(Sock, Data) ->
+ try erlang:port_command(Sock, Data)
+ catch error:Error -> exit({writer, send_failed, Error})
+ end.