diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2015-11-17 15:37:39 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2015-11-17 15:37:39 +0300 |
| commit | 8928a29b43322e6dee9f3b5a6e9b546a93047a89 (patch) | |
| tree | 80dc562389c1d3ca4efd6e3b6be3acdf63976869 /src | |
| parent | 5931edc27eb3273945345dc3f9a784287df88ccd (diff) | |
| parent | 52ce9fa48c596424a39059d13aad3c3183d42cee (diff) | |
| download | rabbitmq-server-git-8928a29b43322e6dee9f3b5a6e9b546a93047a89.tar.gz | |
Merge branch 'master' into rabbitmq-server-351
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_control_main.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 20 |
2 files changed, 21 insertions, 15 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 2799d510d0..5d061252d0 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -769,8 +769,20 @@ call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> true -> lists:map(fun list_to_binary_utf8/1, Args); false -> Args end, - spawn_link(rabbit_cli, rpc_call, [Node, Mod, Fun, Args0, Ref = make_ref(), - Pid = self(), Timeout]), + Ref = make_ref(), + Pid = self(), + spawn_link( + fun () -> + case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, + Ref, Pid, Timeout) of + {error, _} = Error -> + Pid ! {error, Error}; + {bad_argument, _} = Error -> + Pid ! {error, Error}; + _ -> + ok + end + end), rabbit_control_misc:wait_for_info_messages( Pid, Ref, InfoKeys, fun display_info_message/2, Timeout). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d0c566b811..225c21dd54 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -256,13 +256,10 @@ handle_cast({gm, Instruction}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> %% Asynchronous, non-"mandatory", deliver mode. - case Flow of - %% We are acking messages to the channel process that sent us - %% the message delivery. See - %% rabbit_amqqueue_process:handle_ch_down for more info. - flow -> credit_flow:ack(Sender); - noflow -> ok - end, + %% We are acking messages to the channel process that sent us + %% the message delivery. See + %% rabbit_amqqueue_process:handle_ch_down for more info. + maybe_flow_ack(Sender, Flow), noreply(maybe_enqueue_message(Delivery, State)); handle_cast({sync_start, Ref, Syncer}, @@ -658,10 +655,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% need to send an ack for these messages since the channel is waiting %% for one for the via-GM case and we will not now receive one. promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> - case Flow of - flow -> credit_flow:ack(Sender); - noflow -> ok - end, + maybe_flow_ack(Sender, Flow), Delivery#delivery{mandatory = false}. noreply(State) -> @@ -955,8 +949,8 @@ process_instruction({set_queue_mode, Mode}, BQS1 = BQ:set_queue_mode(Mode, BQS), {ok, State #state { backing_queue_state = BQS1 }}. -maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid); -maybe_flow_ack(_ChPid, noflow) -> ok. +maybe_flow_ack(Sender, flow) -> credit_flow:ack(Sender); +maybe_flow_ack(_Sender, noflow) -> ok. msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = |
