diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 6 |
2 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bd64f1e48d..7b2f801a31 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -295,25 +295,23 @@ ack(QPid, Txn, MsgIds, ChPid) -> commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( + fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( - fun (QPid) -> - rabbit_misc:with_exit_handler( - %% we don't care if the queue process has terminated - %% in the meantime - fun () -> ok end, - fun () -> gen_server:call(QPid, {notify_down, ChPid}, - Timeout) end) - end, + %% we don't care if the queue process has terminated in the + %% meantime + fun (_) -> ok end, + fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). binding_forcibly_removed(BindingSpec, QueueName) -> @@ -388,10 +386,13 @@ pseudo_queue(QueueName, Pid) -> binding_specs = [], pid = Pid}. -safe_pmap_ok(F, L) -> +safe_pmap_ok(H, F, L) -> case [R || R <- rabbit_misc:upmap( fun (V) -> - try F(V) + try + rabbit_misc:with_exit_handler( + fun () -> H(V) end, + fun () -> F(V) end) catch Class:Reason -> {Class, Reason} end end, L), @@ -399,4 +400,3 @@ safe_pmap_ok(F, L) -> [] -> ok; Errors -> {error, Errors} end. - diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a9278898ea..ef3a9f0ebf 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -717,7 +717,8 @@ internal_commit(State = #ch{transaction_id = TxnKey, case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of ok -> new_tx(State); - {error, Errors} -> exit({commit_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "commit failed: ~w", [Errors]) end. internal_rollback(State = #ch{transaction_id = TxnKey, @@ -732,7 +733,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> exit({rollback_failed, Errors}) + {error, Errors} -> rabbit_misc:protocol_error( + internal_error, "rollback failed: ~w", [Errors]) end. fold_per_queue(F, Acc0, UAQ) -> |
