summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2017-06-30 16:53:31 +0100
committerDiana Corbacho <diana@rabbitmq.com>2017-06-30 16:53:31 +0100
commit1b6063692b842195c101fe9eafad70da5e33af4b (patch)
treea621d3f6456346ebe577fa51754e8a4a01248e61
parent042122db18179f4cbac6b8805e5f024dd0079bc6 (diff)
downloadrabbitmq-server-git-1b6063692b842195c101fe9eafad70da5e33af4b.tar.gz
Use delegate:invoke and delegate:invoke_no_result
Further simplifications to rabbitmq-server#208 which where too large to apply in 3.6.11. Now invoke and invoke_no_result are called directly
-rw-r--r--src/rabbit_amqqueue.erl65
-rw-r--r--test/cluster_SUITE.erl5
2 files changed, 38 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3fb76be5e8..4eead35c1d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -627,12 +627,12 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
-info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
+info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate:call(QPid, {info, Items}) of
+ case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -693,7 +693,8 @@ force_event_refresh(Ref) ->
notify_policy_changed(#amqqueue{pid = QPid}) ->
gen_server2:cast(QPid, policy_changed).
-consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
+consumers(#amqqueue{ pid = QPid }) ->
+ delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -721,7 +722,7 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
AckRequired, Prefetch, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)].
-stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
pid_of(#amqqueue{pid = Pid}) -> Pid.
pid_of(VHost, QueueName) ->
@@ -739,7 +740,7 @@ delete_immediately(QPids) ->
ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) ->
- delegate:call(QPid, {delete, IfUnused, IfEmpty, ActingUser}).
+ delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}).
delete_crashed(Q) ->
delete_crashed(Q, ?INTERNAL_USER).
@@ -752,21 +753,24 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) ->
BQ:delete_crashed(Q),
ok = internal_delete(QName, ActingUser).
-purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) ->
+ delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}).
-requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, ChPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}).
-ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) ->
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}).
reject(QPid, Requeue, MsgIds, ChPid) ->
- delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}).
notify_down_all(QPids, ChPid) ->
notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT).
notify_down_all(QPids, ChPid, Timeout) ->
- case rpc:call(node(), delegate, call,
- [QPids, {notify_down, ChPid}], Timeout) of
+ case rpc:call(node(), delegate, invoke,
+ [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of
{badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}};
{badrpc, Reason} -> {error, Reason};
{_, Bads} ->
@@ -782,27 +786,29 @@ notify_down_all(QPids, ChPid, Timeout) ->
end.
activate_limit_all(QPids, ChPid) ->
- delegate:cast(QPids, {activate_limit, ChPid}).
+ delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}).
credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
- delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [{credit, ChPid, CTag, Credit, Drain}]}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
- delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
+ delegate:invoke(QPid, {gen_server2, call, [{basic_get, ChPid, NoAck, LimiterPid}, infinity]}).
basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser) ->
ok = check_consume_arguments(QName, Args),
- delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
- Args, OkMsg, ActingUser}).
+ delegate:invoke(QPid, {gen_server2, call,
+ [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg, ActingUser}, infinity]}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg, ActingUser) ->
- delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}).
+ delegate:invoke(QPid, {gen_server2, call,
+ [{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, infinity]}).
notify_decorators(#amqqueue{pid = QPid}) ->
- delegate:cast(QPid, notify_decorators).
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
notify_sent(QPid, ChPid) ->
rabbit_amqqueue_common:notify_sent(QPid, ChPid).
@@ -810,7 +816,7 @@ notify_sent(QPid, ChPid) ->
notify_sent_queue_down(QPid) ->
rabbit_amqqueue_common:notify_sent_queue_down(QPid).
-resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
+resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}).
internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
@@ -907,12 +913,17 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring).
+update_mirroring(QPid) ->
+ ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}).
-sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors);
-sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
-cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors);
-cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+sync_mirrors(#amqqueue{pid = QPid}) ->
+ delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]});
+sync_mirrors(QPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}).
+cancel_sync_mirrors(#amqqueue{pid = QPid}) ->
+ delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]});
+cancel_sync_mirrors(QPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}).
is_mirrored(Q) ->
rabbit_mirror_queue_misc:is_mirrored(Q).
@@ -1031,8 +1042,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
%% done with it.
MMsg = {deliver, Delivery, false},
SMsg = {deliver, Delivery, true},
- delegate:cast(MPids, MMsg),
- delegate:cast(SPids, SMsg),
+ delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
+ delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}),
QPids.
qpids([]) -> {[], []}; %% optimisation
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index 3dba65ae1f..4864989b6a 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -123,11 +123,6 @@ delegates_async1(_Config, SecondaryNode) ->
ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
await_response(2),
- LocalPids = spawn_responders(node(), Responder, 10),
- RemotePids = spawn_responders(SecondaryNode, Responder, 10),
- ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
- await_response(20),
-
passed.
delegates_sync(Config) ->