summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2015-11-17 15:37:39 +0300
committerMichael Klishin <michael@clojurewerkz.org>2015-11-17 15:37:39 +0300
commit8928a29b43322e6dee9f3b5a6e9b546a93047a89 (patch)
tree80dc562389c1d3ca4efd6e3b6be3acdf63976869 /src
parent5931edc27eb3273945345dc3f9a784287df88ccd (diff)
parent52ce9fa48c596424a39059d13aad3c3183d42cee (diff)
downloadrabbitmq-server-git-8928a29b43322e6dee9f3b5a6e9b546a93047a89.tar.gz
Merge branch 'master' into rabbitmq-server-351
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_control_main.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl20
2 files changed, 21 insertions, 15 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 2799d510d0..5d061252d0 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -769,8 +769,20 @@ call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
true -> lists:map(fun list_to_binary_utf8/1, Args);
false -> Args
end,
- spawn_link(rabbit_cli, rpc_call, [Node, Mod, Fun, Args0, Ref = make_ref(),
- Pid = self(), Timeout]),
+ Ref = make_ref(),
+ Pid = self(),
+ spawn_link(
+ fun () ->
+ case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
+ Ref, Pid, Timeout) of
+ {error, _} = Error ->
+ Pid ! {error, Error};
+ {bad_argument, _} = Error ->
+ Pid ! {error, Error};
+ _ ->
+ ok
+ end
+ end),
rabbit_control_misc:wait_for_info_messages(
Pid, Ref, InfoKeys, fun display_info_message/2, Timeout).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d0c566b811..225c21dd54 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -256,13 +256,10 @@ handle_cast({gm, Instruction}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->
%% Asynchronous, non-"mandatory", deliver mode.
- case Flow of
- %% We are acking messages to the channel process that sent us
- %% the message delivery. See
- %% rabbit_amqqueue_process:handle_ch_down for more info.
- flow -> credit_flow:ack(Sender);
- noflow -> ok
- end,
+ %% We are acking messages to the channel process that sent us
+ %% the message delivery. See
+ %% rabbit_amqqueue_process:handle_ch_down for more info.
+ maybe_flow_ack(Sender, Flow),
noreply(maybe_enqueue_message(Delivery, State));
handle_cast({sync_start, Ref, Syncer},
@@ -658,10 +655,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% need to send an ack for these messages since the channel is waiting
%% for one for the via-GM case and we will not now receive one.
promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
- case Flow of
- flow -> credit_flow:ack(Sender);
- noflow -> ok
- end,
+ maybe_flow_ack(Sender, Flow),
Delivery#delivery{mandatory = false}.
noreply(State) ->
@@ -955,8 +949,8 @@ process_instruction({set_queue_mode, Mode},
BQS1 = BQ:set_queue_mode(Mode, BQS),
{ok, State #state { backing_queue_state = BQS1 }}.
-maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid);
-maybe_flow_ack(_ChPid, noflow) -> ok.
+maybe_flow_ack(Sender, flow) -> credit_flow:ack(Sender);
+maybe_flow_ack(_Sender, noflow) -> ok.
msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =