diff options
| -rw-r--r-- | src/delegate.erl | 19 | ||||
| -rw-r--r-- | src/delegate_sup.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 11 |
4 files changed, 32 insertions, 8 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 68739324e6..f3c3f0974e 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -44,6 +44,25 @@ %%---------------------------------------------------------------------------- +-ifdef(use_specs). + +-type(serverref() :: atom() | {atom(), atom()} | {'global', term()} | pid()). + +-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). +-spec(cast/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). +-spec(call/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). + +-spec(gs2_call/3 :: + (serverref(), any(), non_neg_integer() | 'infinity') -> any()). +-spec(gs2_pcall/4 :: + (serverref(), number(), any(), non_neg_integer() | 'infinity') -> any()). +-spec(gs2_cast/2 :: (serverref(), any()) -> 'ok'). +-spec(gs2_pcast/3 :: (serverref(), number(), any()) -> 'ok'). + +-spec(server/1 :: (node() | non_neg_integer()) -> atom()). +-spec(process_count/0 :: () -> non_neg_integer()). + +-endif. %%---------------------------------------------------------------------------- diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index dd3d0eefff..1f35140626 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -41,6 +41,14 @@ %%---------------------------------------------------------------------------- +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1dd92403b0..efa62e1bc2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -92,7 +92,7 @@ -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). --spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7d3cd7225d..1f16ec080e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -944,13 +944,10 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey, self()) of - ok -> NewUAMQ = queue:join(UAQ, UAMQ), - new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> rabbit_misc:protocol_error( - internal_error, "rollback failed: ~w", [Errors]) - end. + ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey, self()), + NewUAMQ = queue:join(UAQ, UAMQ), + new_tx(State#ch{unacked_message_q = NewUAMQ}). rollback_and_notify(State = #ch{transaction_id = none}) -> notify_queues(State); |
