summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_channel.erl6
2 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index bd64f1e48d..7b2f801a31 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -295,25 +295,23 @@ ack(QPid, Txn, MsgIds, ChPid) ->
commit_all(QPids, Txn) ->
Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
+ fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
QPids).
rollback_all(QPids, Txn) ->
safe_pmap_ok(
+ fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
QPids).
notify_down_all(QPids, ChPid) ->
Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
- fun (QPid) ->
- rabbit_misc:with_exit_handler(
- %% we don't care if the queue process has terminated
- %% in the meantime
- fun () -> ok end,
- fun () -> gen_server:call(QPid, {notify_down, ChPid},
- Timeout) end)
- end,
+ %% we don't care if the queue process has terminated in the
+ %% meantime
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
QPids).
binding_forcibly_removed(BindingSpec, QueueName) ->
@@ -388,10 +386,13 @@ pseudo_queue(QueueName, Pid) ->
binding_specs = [],
pid = Pid}.
-safe_pmap_ok(F, L) ->
+safe_pmap_ok(H, F, L) ->
case [R || R <- rabbit_misc:upmap(
fun (V) ->
- try F(V)
+ try
+ rabbit_misc:with_exit_handler(
+ fun () -> H(V) end,
+ fun () -> F(V) end)
catch Class:Reason -> {Class, Reason}
end
end, L),
@@ -399,4 +400,3 @@ safe_pmap_ok(F, L) ->
[] -> ok;
Errors -> {error, Errors}
end.
-
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a9278898ea..ef3a9f0ebf 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -717,7 +717,8 @@ internal_commit(State = #ch{transaction_id = TxnKey,
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
ok -> new_tx(State);
- {error, Errors} -> exit({commit_failed, Errors})
+ {error, Errors} -> rabbit_misc:protocol_error(
+ internal_error, "commit failed: ~w", [Errors])
end.
internal_rollback(State = #ch{transaction_id = TxnKey,
@@ -732,7 +733,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
TxnKey) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
- {error, Errors} -> exit({rollback_failed, Errors})
+ {error, Errors} -> rabbit_misc:protocol_error(
+ internal_error, "rollback failed: ~w", [Errors])
end.
fold_per_queue(F, Acc0, UAQ) ->