summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda Dube <ayanda.dube@erlang-solutions.com>2015-10-16 01:48:53 +0100
committerAyanda Dube <ayanda.dube@erlang-solutions.com>2015-10-16 01:48:53 +0100
commitab7eeed0a1d2d6088da911a26bd6492ac31b9fe0 (patch)
tree340cda23c3fdc69f831eff84ac074b1566dc4709
parent6e98d2bb478d95fdf0efdacdd831ab88df9f8a35 (diff)
downloadrabbitmq-server-git-ab7eeed0a1d2d6088da911a26bd6492ac31b9fe0.tar.gz
Adds emitting_map_with_wrapper_fun/5 and emitting_map_with_wrapper_fun/6.
Fixes handling of emitted messages which use 'continue' tag. References #62
-rw-r--r--src/rabbit_control_main.erl26
1 files changed, 22 insertions, 4 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 2ddf496233..6f092fb8d1 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -20,7 +20,8 @@
-export([start/0, stop/0, parse_arguments/2, action/5, action/6,
sync_queue/1, cancel_sync_queue/1, become/1,
- purge_queue/1, emitting_map/4, emitting_map/5]).
+ purge_queue/1, emitting_map/4, emitting_map/5,
+ emitting_map_with_wrapper_fun/5, emitting_map_with_wrapper_fun/6]).
-import(rabbit_cli, [rpc_call/4, rpc_call/5, rpc_call/7]).
@@ -676,10 +677,12 @@ wait_for_info_messages(Ref, InfoItemKeys) when is_reference(Ref) ->
receive
{Ref, finished} -> ok;
{Ref, {timeout, T}} -> exit({error, {timeout, (T / 1000)}});
- {Ref, continue} -> wait_for_info_messages(Ref, InfoItemKeys);
{Ref, []} -> wait_for_info_messages(Ref, InfoItemKeys);
{Ref, Result} -> display_info_message(Result, InfoItemKeys),
wait_for_info_messages(Ref, InfoItemKeys);
+ {Ref, Result, continue} ->
+ display_info_message(Result, InfoItemKeys),
+ wait_for_info_messages(Ref, InfoItemKeys);
_ -> wait_for_info_messages(Ref, InfoItemKeys)
end.
@@ -788,9 +791,24 @@ list_to_binary_utf8(L) ->
emitting_map(AggregatorPid, Ref, Fun, List) ->
emitting_map(AggregatorPid, Ref, Fun, List, finished).
-emitting_map(AggregatorPid, Ref, Fun, List, Control) ->
+emitting_map(AggregatorPid, Ref, Fun, List, continue) ->
+ [AggregatorPid ! {Ref, Fun(Item), continue} || Item <- List];
+emitting_map(AggregatorPid, Ref, Fun, List, finished) ->
[AggregatorPid ! {Ref, Fun(Item)} || Item <- List],
- AggregatorPid ! {Ref, Control},
+ AggregatorPid ! {Ref, finished},
+ ok.
+
+emitting_map_with_wrapper_fun(AggregatorPid, Ref, Fun, WrapperFun, List) ->
+ emitting_map_with_wrapper_fun(AggregatorPid, Ref, Fun, WrapperFun, List,
+ finished).
+emitting_map_with_wrapper_fun(AggregatorPid, Ref, Fun, WrapperFun, List,
+ continue) ->
+ WrapperFun(fun(Item) -> AggregatorPid ! {Ref, Fun(Item), continue} end,
+ List);
+emitting_map_with_wrapper_fun(AggregatorPid, Ref, Fun, WrapperFun, List,
+ finished) ->
+ WrapperFun(fun(Item) -> AggregatorPid ! {Ref, Fun(Item)} end, List),
+ AggregatorPid ! {Ref, finished},
ok.
%% escape does C-style backslash escaping of non-printable ASCII