diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 65 | ||||
| -rw-r--r-- | test/cluster_SUITE.erl | 5 |
2 files changed, 38 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3fb76be5e8..4eead35c1d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -627,12 +627,12 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); -info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info). +info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). info(Q = #amqqueue{ state = crashed }, Items) -> info_down(Q, Items, crashed); info(#amqqueue{ pid = QPid }, Items) -> - case delegate:call(QPid, {info, Items}) of + case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -693,7 +693,8 @@ force_event_refresh(Ref) -> notify_policy_changed(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, policy_changed). -consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). +consumers(#amqqueue{ pid = QPid }) -> + delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). consumer_info_keys() -> ?CONSUMER_INFO_KEYS. @@ -721,7 +722,7 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) -> AckRequired, Prefetch, Args]) || {ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)]. -stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}). pid_of(#amqqueue{pid = Pid}) -> Pid. pid_of(VHost, QueueName) -> @@ -739,7 +740,7 @@ delete_immediately(QPids) -> ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) -> - delegate:call(QPid, {delete, IfUnused, IfEmpty, ActingUser}). + delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}). delete_crashed(Q) -> delete_crashed(Q, ?INTERNAL_USER). @@ -752,21 +753,24 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) -> BQ:delete_crashed(Q), ok = internal_delete(QName, ActingUser). -purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> + delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}). -requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). +requeue(QPid, MsgIds, ChPid) -> + delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}). -ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). +ack(QPid, MsgIds, ChPid) -> + delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}). reject(QPid, Requeue, MsgIds, ChPid) -> - delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}). + delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}). notify_down_all(QPids, ChPid) -> notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT). notify_down_all(QPids, ChPid, Timeout) -> - case rpc:call(node(), delegate, call, - [QPids, {notify_down, ChPid}], Timeout) of + case rpc:call(node(), delegate, invoke, + [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of {badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}}; {badrpc, Reason} -> {error, Reason}; {_, Bads} -> @@ -782,27 +786,29 @@ notify_down_all(QPids, ChPid, Timeout) -> end. activate_limit_all(QPids, ChPid) -> - delegate:cast(QPids, {activate_limit, ChPid}). + delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> - delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). + delegate:invoke_no_result(QPid, {gen_server2, cast, [{credit, ChPid, CTag, Credit, Drain}]}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> - delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). + delegate:invoke(QPid, {gen_server2, call, [{basic_get, ChPid, NoAck, LimiterPid}, infinity]}). basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser) -> ok = check_consume_arguments(QName, Args), - delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, - Args, OkMsg, ActingUser}). + delegate:invoke(QPid, {gen_server2, call, + [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg, ActingUser}, infinity]}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg, ActingUser) -> - delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}). + delegate:invoke(QPid, {gen_server2, call, + [{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, infinity]}). notify_decorators(#amqqueue{pid = QPid}) -> - delegate:cast(QPid, notify_decorators). + delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}). notify_sent(QPid, ChPid) -> rabbit_amqqueue_common:notify_sent(QPid, ChPid). @@ -810,7 +816,7 @@ notify_sent(QPid, ChPid) -> notify_sent_queue_down(QPid) -> rabbit_amqqueue_common:notify_sent_queue_down(QPid). -resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). +resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}). internal_delete1(QueueName, OnlyDurable) -> ok = mnesia:delete({rabbit_queue, QueueName}), @@ -907,12 +913,17 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring). +update_mirroring(QPid) -> + ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}). -sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors); -sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). -cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors); -cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +sync_mirrors(#amqqueue{pid = QPid}) -> + delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}); +sync_mirrors(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}). +cancel_sync_mirrors(#amqqueue{pid = QPid}) -> + delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}); +cancel_sync_mirrors(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}). is_mirrored(Q) -> rabbit_mirror_queue_misc:is_mirrored(Q). @@ -1031,8 +1042,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) -> %% done with it. MMsg = {deliver, Delivery, false}, SMsg = {deliver, Delivery, true}, - delegate:cast(MPids, MMsg), - delegate:cast(SPids, SMsg), + delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}), + delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}), QPids. qpids([]) -> {[], []}; %% optimisation diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index 3dba65ae1f..4864989b6a 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -123,11 +123,6 @@ delegates_async1(_Config, SecondaryNode) -> ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender), await_response(2), - LocalPids = spawn_responders(node(), Responder, 10), - RemotePids = spawn_responders(SecondaryNode, Responder, 10), - ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender), - await_response(20), - passed. delegates_sync(Config) -> |
