summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/test-erlang-otp-21.3.yaml95
-rw-r--r--.github/workflows/test-erlang-otp-22.3.yaml95
-rw-r--r--src/transducers.erl113
-rw-r--r--test/unit_transducers_SUITE.erl67
4 files changed, 370 insertions, 0 deletions
diff --git a/.github/workflows/test-erlang-otp-21.3.yaml b/.github/workflows/test-erlang-otp-21.3.yaml
index ed6bffd5b0..9313f01fa1 100644
--- a/.github/workflows/test-erlang-otp-21.3.yaml
+++ b/.github/workflows/test-erlang-otp-21.3.yaml
@@ -8168,6 +8168,100 @@ jobs:
printf "\n\n"
fi
done # vim:sw=2:et:
+ ct-unit_transducers:
+ needs: [checks]
+ # https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts
+ name: ct-unit_transducers
+ runs-on: ubuntu-18.04
+ steps:
+ - name: CHECKOUT REPOSITORY
+ uses: actions/checkout@v2
+ # https://github.com/marketplace/actions/setup-elixir
+ - name: CONFIGURE OTP & ELIXIR
+ uses: actions/setup-elixir@v1
+ with:
+ otp-version: 21.3
+ # https://github.com/elixir-lang/elixir/releases
+ elixir-version: 1.10.3
+ - name: DOWNLOAD DEPS ARCHIVE
+ uses: actions/download-artifact@v2
+ with:
+ name: deps.tar.xz
+ - name: UNPACK DEPS ARCHIVE
+ run: |
+ tar Jxf deps.tar.xz
+ - name: RUN TESTS
+ run: |
+ branch_or_tag_name=${GITHUB_REF#refs/*/}
+ ! test -d ebin || touch ebin/*
+ export BASE_RMQ_REF=master
+ export ERLANG_VERSION=21.3
+ export ELIXIR_VERSION=1.10.3
+ make ct-unit_transducers \
+ base_rmq_ref=master \
+ current_rmq_ref=$branch_or_tag_name \
+ FULL= \
+ FAIL_FAST=1 \
+ SKIP_AS_ERROR=1 \
+ CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'"
+ - name: DOWNLOAD SECONDARY UMBRELLAS ARCHIVE
+ if: success() && 'oldest' == 'oldest'
+ uses: actions/download-artifact@v2
+ with:
+ name: secondary-umbrellas.tar.xz
+ - name: UNPACK SECONDARY UMBRELLAS ARCHIVE
+ if: success() && 'oldest' == 'oldest'
+ run: |
+ set -ex
+ tar Jxf secondary-umbrellas.tar.xz
+ rm secondary-umbrellas.tar.xz
+ - name: RUN TESTS [mixed-versions]
+ if: success() && 'oldest' == 'oldest'
+ run: |
+ set -ex
+ branch_or_tag_name=${GITHUB_REF#refs/*/}
+ for umbrella in umbrellas/*; do
+ test -d "$umbrella"
+ printf '\n\033[1;32mMixing clusters with RabbitMQ %s\033[0m' \
+ $(basename "$umbrella")
+ make distclean-ct ct-unit_transducers \
+ base_rmq_ref=master \
+ current_rmq_ref=$branch_or_tag_name \
+ FULL= \
+ FAIL_FAST=1 \
+ SKIP_AS_ERROR=1 \
+ SECONDARY_UMBRELLA=$PWD/$umbrella \
+ RABBITMQ_FEATURE_FLAGS= \
+ CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'"
+ done
+ - name: ON FAILURE ARCHIVE TESTS LOGS
+ if: failure()
+ run: |
+ make ct-logs-archive
+ - name: ON FAILURE UPLOAD TESTS LOGS ARTIFACT
+ # https://github.com/marketplace/actions/upload-artifact
+ uses: actions/upload-artifact@v2-preview
+ if: failure()
+ with:
+ name: ct-unit_transducers-logs
+ path: "*-ct-logs-*.tar.xz"
+ - name: HONEYCOMB
+ if: success() || failure()
+ run: |
+ echo "$(ls honeycomb | wc -l) events recorded"
+ for f in honeycomb/*; do
+ RC=$(curl --silent \
+ -H 'X-Honeycomb-Team: ${{ secrets.HONEYCOMB_TEAM }}' \
+ -d @${f} \
+ -o /dev/null \
+ -w "%{http_code}" \
+ "https://api.honeycomb.io/1/events/rabbitmq-ci")
+ if [ "$RC" != "200" ]; then
+ echo "Honeycomb returned ${RC}"
+ cat ${f}
+ printf "\n\n"
+ fi
+ done # vim:sw=2:et:
ct-unit_vm_memory_monitor:
needs: [checks]
# https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts
@@ -8539,6 +8633,7 @@ jobs:
- ct-unit_queue_consumers
- ct-unit_stats_and_metrics
- ct-unit_supervisor2
+ - ct-unit_transducers
- ct-unit_vm_memory_monitor
- ct-upgrade_preparation
- ct-vhost
diff --git a/.github/workflows/test-erlang-otp-22.3.yaml b/.github/workflows/test-erlang-otp-22.3.yaml
index 70cbb10469..7da28868d3 100644
--- a/.github/workflows/test-erlang-otp-22.3.yaml
+++ b/.github/workflows/test-erlang-otp-22.3.yaml
@@ -8168,6 +8168,100 @@ jobs:
printf "\n\n"
fi
done # vim:sw=2:et:
+ ct-unit_transducers:
+ needs: [checks]
+ # https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts
+ name: ct-unit_transducers
+ runs-on: ubuntu-18.04
+ steps:
+ - name: CHECKOUT REPOSITORY
+ uses: actions/checkout@v2
+ # https://github.com/marketplace/actions/setup-elixir
+ - name: CONFIGURE OTP & ELIXIR
+ uses: actions/setup-elixir@v1
+ with:
+ otp-version: 22.3
+ # https://github.com/elixir-lang/elixir/releases
+ elixir-version: 1.10.3
+ - name: DOWNLOAD DEPS ARCHIVE
+ uses: actions/download-artifact@v2
+ with:
+ name: deps.tar.xz
+ - name: UNPACK DEPS ARCHIVE
+ run: |
+ tar Jxf deps.tar.xz
+ - name: RUN TESTS
+ run: |
+ branch_or_tag_name=${GITHUB_REF#refs/*/}
+ ! test -d ebin || touch ebin/*
+ export BASE_RMQ_REF=master
+ export ERLANG_VERSION=22.3
+ export ELIXIR_VERSION=1.10.3
+ make ct-unit_transducers \
+ base_rmq_ref=master \
+ current_rmq_ref=$branch_or_tag_name \
+ FULL= \
+ FAIL_FAST=1 \
+ SKIP_AS_ERROR=1 \
+ CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'"
+ - name: DOWNLOAD SECONDARY UMBRELLAS ARCHIVE
+ if: success() && 'latest' == 'oldest'
+ uses: actions/download-artifact@v2
+ with:
+ name: secondary-umbrellas.tar.xz
+ - name: UNPACK SECONDARY UMBRELLAS ARCHIVE
+ if: success() && 'latest' == 'oldest'
+ run: |
+ set -ex
+ tar Jxf secondary-umbrellas.tar.xz
+ rm secondary-umbrellas.tar.xz
+ - name: RUN TESTS [mixed-versions]
+ if: success() && 'latest' == 'oldest'
+ run: |
+ set -ex
+ branch_or_tag_name=${GITHUB_REF#refs/*/}
+ for umbrella in umbrellas/*; do
+ test -d "$umbrella"
+ printf '\n\033[1;32mMixing clusters with RabbitMQ %s\033[0m' \
+ $(basename "$umbrella")
+ make distclean-ct ct-unit_transducers \
+ base_rmq_ref=master \
+ current_rmq_ref=$branch_or_tag_name \
+ FULL= \
+ FAIL_FAST=1 \
+ SKIP_AS_ERROR=1 \
+ SECONDARY_UMBRELLA=$PWD/$umbrella \
+ RABBITMQ_FEATURE_FLAGS= \
+ CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'"
+ done
+ - name: ON FAILURE ARCHIVE TESTS LOGS
+ if: failure()
+ run: |
+ make ct-logs-archive
+ - name: ON FAILURE UPLOAD TESTS LOGS ARTIFACT
+ # https://github.com/marketplace/actions/upload-artifact
+ uses: actions/upload-artifact@v2-preview
+ if: failure()
+ with:
+ name: ct-unit_transducers-logs
+ path: "*-ct-logs-*.tar.xz"
+ - name: HONEYCOMB
+ if: success() || failure()
+ run: |
+ echo "$(ls honeycomb | wc -l) events recorded"
+ for f in honeycomb/*; do
+ RC=$(curl --silent \
+ -H 'X-Honeycomb-Team: ${{ secrets.HONEYCOMB_TEAM }}' \
+ -d @${f} \
+ -o /dev/null \
+ -w "%{http_code}" \
+ "https://api.honeycomb.io/1/events/rabbitmq-ci")
+ if [ "$RC" != "200" ]; then
+ echo "Honeycomb returned ${RC}"
+ cat ${f}
+ printf "\n\n"
+ fi
+ done # vim:sw=2:et:
ct-unit_vm_memory_monitor:
needs: [checks]
# https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts
@@ -8539,6 +8633,7 @@ jobs:
- ct-unit_queue_consumers
- ct-unit_stats_and_metrics
- ct-unit_supervisor2
+ - ct-unit_transducers
- ct-unit_vm_memory_monitor
- ct-upgrade_preparation
- ct-vhost
diff --git a/src/transducers.erl b/src/transducers.erl
new file mode 100644
index 0000000000..cd5d12eddc
--- /dev/null
+++ b/src/transducers.erl
@@ -0,0 +1,113 @@
+-module(transducers).
+
+-export([transduce/3,
+ transduce/4,
+ map/1,
+ map_worker_pool/2]).
+
+-export([worker_loop/3]).
+
+-type reducer() :: fun(({} | {any()} | {any(), any()}) -> any()).
+-type transducer() :: fun((reducer()) -> reducer()).
+
+-spec transduce(transducer(), reducer(), list()) -> any().
+transduce(Xform, F, List) ->
+ transduce(Xform, F, F({}), List).
+
+-spec transduce(transducer(), reducer(), any(), list()) -> any().
+transduce(Xform, F, Init, List) ->
+ transduce0(List, Xform(F), Init).
+
+transduce0([], Xf, Acc) ->
+ Xf({Acc});
+transduce0([H | Rest], Xf, Acc) ->
+ transduce0(Rest, Xf, Xf({Acc, H})).
+
+-spec map(fun()) -> transducer().
+map(Fun) ->
+ fun (Rf) ->
+ fun
+ ({}) ->
+ Rf({});
+ ({Result}) ->
+ Rf({Result});
+ ({Result, Input}) ->
+ Rf({Result, Fun(Input)})
+ end
+ end.
+
+%% worker_map
+%% worker_map uses a worker pool to transform the values in the sequence. It may reorder entries.
+-spec map_worker_pool(integer(), fun()) -> transducer().
+map_worker_pool(WorkerCount, Fun) ->
+ fun (Rf) ->
+ JobsOutstanding = counters:new(1, []),
+ WorkersEts = ets:new(?MODULE, []),
+ WorkerIds = lists:seq(1, WorkerCount),
+ [erlang:monitor(process,
+ spawn(?MODULE,
+ worker_loop,
+ [Id, Fun, self()]))
+ || Id <- WorkerIds],
+ [receive
+ {ready, Id, Pid} ->
+ ets:insert(WorkersEts, {Id, free, Pid})
+ end || Id <- WorkerIds],
+ fun
+ ({}) ->
+ Rf({});
+ ({Result}) ->
+ NewResult = (fun ReduceRemaining(0, Acc) ->
+ Acc;
+ ReduceRemaining(Count, Acc) ->
+ receive
+ {result, _, WorkerResult} ->
+ ReduceRemaining(Count - 1,
+ Rf({Acc, WorkerResult}))
+ end
+ end)(counters:get(JobsOutstanding, 1), Result),
+ ets:foldl(fun ({_, _, Pid}, _) -> Pid ! finish end, ok, WorkersEts),
+ ets:delete(WorkersEts),
+ Rf({NewResult});
+ ({Result, Input}) ->
+ counters:add(JobsOutstanding, 1, 1),
+ case free_worker(WorkersEts) of
+ {FreeWorkerId, FreeWorkerPid} ->
+ %% io:format("Found free worker ~p with pid ~p~n", [FreeWorkerId, FreeWorkerPid]),
+ true = ets:update_element(WorkersEts, FreeWorkerId, {2, busy}),
+ FreeWorkerPid ! {work, self(), Input},
+ Result;
+ none ->
+ %% io:format("No free workers...~n", []),
+ % should we check for dead workers?
+ receive
+ {result, WorkerId, WorkerResult} ->
+ counters:sub(JobsOutstanding, 1, 1),
+ true = ets:update_element(WorkersEts, WorkerId, {2, free}),
+ {FreeWorkerId, FreeWorkerPid} = free_worker(WorkersEts),
+ %% io:format("Found free worker ~p with pid ~p~n", [FreeWorkerId, FreeWorkerPid]),
+ true = ets:update_element(WorkersEts, FreeWorkerId, {2, busy}),
+ FreeWorkerPid ! {work, self(), Input},
+ Rf({Result, WorkerResult})
+ end
+ end
+ end
+ end.
+
+free_worker(WorkersEts) ->
+ case ets:match(WorkersEts, {'$1', free, '$2'}, 1) of
+ '$end_of_table' -> none;
+ {[[Id, Pid]], _} -> {Id, Pid}
+ end.
+
+worker_loop(Id, Fun, CoordinatorPid) ->
+ CoordinatorPid ! {ready, Id, self()},
+ (fun WorkerLoop() ->
+ receive
+ {work, Dest, Input} ->
+ Dest ! {result, Id, Fun(Input)},
+ WorkerLoop();
+ finish ->
+ ok
+ end
+ end)().
diff --git a/test/unit_transducers_SUITE.erl b/test/unit_transducers_SUITE.erl
new file mode 100644
index 0000000000..ec64a4aa38
--- /dev/null
+++ b/test/unit_transducers_SUITE.erl
@@ -0,0 +1,67 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(unit_transducers_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, map_tests},
+ {group, map_worker_pool_tests}
+ ].
+
+groups() ->
+ [
+ {map_tests, [parallel], [map_transducer]},
+ {map_worker_pool_tests, [parallel], [map_worker_pool_transducer]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 1}}
+ ].
+
+%% ---------------------------------------------------------------------------
+
+map_transducer(_Config) ->
+ Xf = transducers:map(fun (I) -> I + 1 end),
+ ?assertEqual([2, 3, 4], transducers:transduce(Xf,
+ fun into_list/1,
+ lists:seq(1, 3))).
+
+map_worker_pool_transducer(_Config) ->
+ Xf = transducers:map_worker_pool(2, fun (I) ->
+ timer:sleep(200),
+ I + I
+ end),
+ {Time, Value} = timer:tc(transducers, transduce,
+ [Xf, fun into_list/1, lists:seq(1, 3)]),
+ ?assertEqual([2, 4, 6], lists:sort(Value)),
+ ?assertMatch(T when T < 600000, Time,
+ "Should complete faster than serially"),
+ ?assertMatch(T when T > 400000, Time,
+ "But not faster than the worker count allows").
+
+%% ---------------------------------------------------------------------------
+
+into_list({}) -> [];
+into_list({Acc}) -> Acc;
+into_list({Acc, I}) -> Acc ++ [I].