diff options
-rw-r--r-- | .github/workflows/test-erlang-otp-21.3.yaml | 95 | ||||
-rw-r--r-- | .github/workflows/test-erlang-otp-22.3.yaml | 95 | ||||
-rw-r--r-- | src/transducers.erl | 113 | ||||
-rw-r--r-- | test/unit_transducers_SUITE.erl | 67 |
4 files changed, 370 insertions, 0 deletions
diff --git a/.github/workflows/test-erlang-otp-21.3.yaml b/.github/workflows/test-erlang-otp-21.3.yaml index ed6bffd5b0..9313f01fa1 100644 --- a/.github/workflows/test-erlang-otp-21.3.yaml +++ b/.github/workflows/test-erlang-otp-21.3.yaml @@ -8168,6 +8168,100 @@ jobs: printf "\n\n" fi done # vim:sw=2:et: + ct-unit_transducers: + needs: [checks] + # https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts + name: ct-unit_transducers + runs-on: ubuntu-18.04 + steps: + - name: CHECKOUT REPOSITORY + uses: actions/checkout@v2 + # https://github.com/marketplace/actions/setup-elixir + - name: CONFIGURE OTP & ELIXIR + uses: actions/setup-elixir@v1 + with: + otp-version: 21.3 + # https://github.com/elixir-lang/elixir/releases + elixir-version: 1.10.3 + - name: DOWNLOAD DEPS ARCHIVE + uses: actions/download-artifact@v2 + with: + name: deps.tar.xz + - name: UNPACK DEPS ARCHIVE + run: | + tar Jxf deps.tar.xz + - name: RUN TESTS + run: | + branch_or_tag_name=${GITHUB_REF#refs/*/} + ! test -d ebin || touch ebin/* + export BASE_RMQ_REF=master + export ERLANG_VERSION=21.3 + export ELIXIR_VERSION=1.10.3 + make ct-unit_transducers \ + base_rmq_ref=master \ + current_rmq_ref=$branch_or_tag_name \ + FULL= \ + FAIL_FAST=1 \ + SKIP_AS_ERROR=1 \ + CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'" + - name: DOWNLOAD SECONDARY UMBRELLAS ARCHIVE + if: success() && 'oldest' == 'oldest' + uses: actions/download-artifact@v2 + with: + name: secondary-umbrellas.tar.xz + - name: UNPACK SECONDARY UMBRELLAS ARCHIVE + if: success() && 'oldest' == 'oldest' + run: | + set -ex + tar Jxf secondary-umbrellas.tar.xz + rm secondary-umbrellas.tar.xz + - name: RUN TESTS [mixed-versions] + if: success() && 'oldest' == 'oldest' + run: | + set -ex + branch_or_tag_name=${GITHUB_REF#refs/*/} + for umbrella in umbrellas/*; do + test -d "$umbrella" + printf '\n\033[1;32mMixing clusters with RabbitMQ %s\033[0m' \ + $(basename "$umbrella") + make distclean-ct ct-unit_transducers \ + base_rmq_ref=master \ + current_rmq_ref=$branch_or_tag_name \ + FULL= \ + FAIL_FAST=1 \ + SKIP_AS_ERROR=1 \ + SECONDARY_UMBRELLA=$PWD/$umbrella \ + RABBITMQ_FEATURE_FLAGS= \ + CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'" + done + - name: ON FAILURE ARCHIVE TESTS LOGS + if: failure() + run: | + make ct-logs-archive + - name: ON FAILURE UPLOAD TESTS LOGS ARTIFACT + # https://github.com/marketplace/actions/upload-artifact + uses: actions/upload-artifact@v2-preview + if: failure() + with: + name: ct-unit_transducers-logs + path: "*-ct-logs-*.tar.xz" + - name: HONEYCOMB + if: success() || failure() + run: | + echo "$(ls honeycomb | wc -l) events recorded" + for f in honeycomb/*; do + RC=$(curl --silent \ + -H 'X-Honeycomb-Team: ${{ secrets.HONEYCOMB_TEAM }}' \ + -d @${f} \ + -o /dev/null \ + -w "%{http_code}" \ + "https://api.honeycomb.io/1/events/rabbitmq-ci") + if [ "$RC" != "200" ]; then + echo "Honeycomb returned ${RC}" + cat ${f} + printf "\n\n" + fi + done # vim:sw=2:et: ct-unit_vm_memory_monitor: needs: [checks] # https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts @@ -8539,6 +8633,7 @@ jobs: - ct-unit_queue_consumers - ct-unit_stats_and_metrics - ct-unit_supervisor2 + - ct-unit_transducers - ct-unit_vm_memory_monitor - ct-upgrade_preparation - ct-vhost diff --git a/.github/workflows/test-erlang-otp-22.3.yaml b/.github/workflows/test-erlang-otp-22.3.yaml index 70cbb10469..7da28868d3 100644 --- a/.github/workflows/test-erlang-otp-22.3.yaml +++ b/.github/workflows/test-erlang-otp-22.3.yaml @@ -8168,6 +8168,100 @@ jobs: printf "\n\n" fi done # vim:sw=2:et: + ct-unit_transducers: + needs: [checks] + # https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts + name: ct-unit_transducers + runs-on: ubuntu-18.04 + steps: + - name: CHECKOUT REPOSITORY + uses: actions/checkout@v2 + # https://github.com/marketplace/actions/setup-elixir + - name: CONFIGURE OTP & ELIXIR + uses: actions/setup-elixir@v1 + with: + otp-version: 22.3 + # https://github.com/elixir-lang/elixir/releases + elixir-version: 1.10.3 + - name: DOWNLOAD DEPS ARCHIVE + uses: actions/download-artifact@v2 + with: + name: deps.tar.xz + - name: UNPACK DEPS ARCHIVE + run: | + tar Jxf deps.tar.xz + - name: RUN TESTS + run: | + branch_or_tag_name=${GITHUB_REF#refs/*/} + ! test -d ebin || touch ebin/* + export BASE_RMQ_REF=master + export ERLANG_VERSION=22.3 + export ELIXIR_VERSION=1.10.3 + make ct-unit_transducers \ + base_rmq_ref=master \ + current_rmq_ref=$branch_or_tag_name \ + FULL= \ + FAIL_FAST=1 \ + SKIP_AS_ERROR=1 \ + CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'" + - name: DOWNLOAD SECONDARY UMBRELLAS ARCHIVE + if: success() && 'latest' == 'oldest' + uses: actions/download-artifact@v2 + with: + name: secondary-umbrellas.tar.xz + - name: UNPACK SECONDARY UMBRELLAS ARCHIVE + if: success() && 'latest' == 'oldest' + run: | + set -ex + tar Jxf secondary-umbrellas.tar.xz + rm secondary-umbrellas.tar.xz + - name: RUN TESTS [mixed-versions] + if: success() && 'latest' == 'oldest' + run: | + set -ex + branch_or_tag_name=${GITHUB_REF#refs/*/} + for umbrella in umbrellas/*; do + test -d "$umbrella" + printf '\n\033[1;32mMixing clusters with RabbitMQ %s\033[0m' \ + $(basename "$umbrella") + make distclean-ct ct-unit_transducers \ + base_rmq_ref=master \ + current_rmq_ref=$branch_or_tag_name \ + FULL= \ + FAIL_FAST=1 \ + SKIP_AS_ERROR=1 \ + SECONDARY_UMBRELLA=$PWD/$umbrella \ + RABBITMQ_FEATURE_FLAGS= \ + CT_OPTS="-ct_hooks honeycomb_cth '[{directory,\"$PWD/honeycomb\"}]'" + done + - name: ON FAILURE ARCHIVE TESTS LOGS + if: failure() + run: | + make ct-logs-archive + - name: ON FAILURE UPLOAD TESTS LOGS ARTIFACT + # https://github.com/marketplace/actions/upload-artifact + uses: actions/upload-artifact@v2-preview + if: failure() + with: + name: ct-unit_transducers-logs + path: "*-ct-logs-*.tar.xz" + - name: HONEYCOMB + if: success() || failure() + run: | + echo "$(ls honeycomb | wc -l) events recorded" + for f in honeycomb/*; do + RC=$(curl --silent \ + -H 'X-Honeycomb-Team: ${{ secrets.HONEYCOMB_TEAM }}' \ + -d @${f} \ + -o /dev/null \ + -w "%{http_code}" \ + "https://api.honeycomb.io/1/events/rabbitmq-ci") + if [ "$RC" != "200" ]; then + echo "Honeycomb returned ${RC}" + cat ${f} + printf "\n\n" + fi + done # vim:sw=2:et: ct-unit_vm_memory_monitor: needs: [checks] # https://help.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#contexts @@ -8539,6 +8633,7 @@ jobs: - ct-unit_queue_consumers - ct-unit_stats_and_metrics - ct-unit_supervisor2 + - ct-unit_transducers - ct-unit_vm_memory_monitor - ct-upgrade_preparation - ct-vhost diff --git a/src/transducers.erl b/src/transducers.erl new file mode 100644 index 0000000000..cd5d12eddc --- /dev/null +++ b/src/transducers.erl @@ -0,0 +1,113 @@ +-module(transducers). + +-export([transduce/3, + transduce/4, + map/1, + map_worker_pool/2]). + +-export([worker_loop/3]). + +-type reducer() :: fun(({} | {any()} | {any(), any()}) -> any()). +-type transducer() :: fun((reducer()) -> reducer()). + +-spec transduce(transducer(), reducer(), list()) -> any(). +transduce(Xform, F, List) -> + transduce(Xform, F, F({}), List). + +-spec transduce(transducer(), reducer(), any(), list()) -> any(). +transduce(Xform, F, Init, List) -> + transduce0(List, Xform(F), Init). + +transduce0([], Xf, Acc) -> + Xf({Acc}); +transduce0([H | Rest], Xf, Acc) -> + transduce0(Rest, Xf, Xf({Acc, H})). + +-spec map(fun()) -> transducer(). +map(Fun) -> + fun (Rf) -> + fun + ({}) -> + Rf({}); + ({Result}) -> + Rf({Result}); + ({Result, Input}) -> + Rf({Result, Fun(Input)}) + end + end. + +%% worker_map +%% worker_map uses a worker pool to transform the values in the sequence. It may reorder entries. +-spec map_worker_pool(integer(), fun()) -> transducer(). +map_worker_pool(WorkerCount, Fun) -> + fun (Rf) -> + JobsOutstanding = counters:new(1, []), + WorkersEts = ets:new(?MODULE, []), + WorkerIds = lists:seq(1, WorkerCount), + [erlang:monitor(process, + spawn(?MODULE, + worker_loop, + [Id, Fun, self()])) + || Id <- WorkerIds], + [receive + {ready, Id, Pid} -> + ets:insert(WorkersEts, {Id, free, Pid}) + end || Id <- WorkerIds], + fun + ({}) -> + Rf({}); + ({Result}) -> + NewResult = (fun ReduceRemaining(0, Acc) -> + Acc; + ReduceRemaining(Count, Acc) -> + receive + {result, _, WorkerResult} -> + ReduceRemaining(Count - 1, + Rf({Acc, WorkerResult})) + end + end)(counters:get(JobsOutstanding, 1), Result), + ets:foldl(fun ({_, _, Pid}, _) -> Pid ! finish end, ok, WorkersEts), + ets:delete(WorkersEts), + Rf({NewResult}); + ({Result, Input}) -> + counters:add(JobsOutstanding, 1, 1), + case free_worker(WorkersEts) of + {FreeWorkerId, FreeWorkerPid} -> + %% io:format("Found free worker ~p with pid ~p~n", [FreeWorkerId, FreeWorkerPid]), + true = ets:update_element(WorkersEts, FreeWorkerId, {2, busy}), + FreeWorkerPid ! {work, self(), Input}, + Result; + none -> + %% io:format("No free workers...~n", []), + % should we check for dead workers? + receive + {result, WorkerId, WorkerResult} -> + counters:sub(JobsOutstanding, 1, 1), + true = ets:update_element(WorkersEts, WorkerId, {2, free}), + {FreeWorkerId, FreeWorkerPid} = free_worker(WorkersEts), + %% io:format("Found free worker ~p with pid ~p~n", [FreeWorkerId, FreeWorkerPid]), + true = ets:update_element(WorkersEts, FreeWorkerId, {2, busy}), + FreeWorkerPid ! {work, self(), Input}, + Rf({Result, WorkerResult}) + end + end + end + end. + +free_worker(WorkersEts) -> + case ets:match(WorkersEts, {'$1', free, '$2'}, 1) of + '$end_of_table' -> none; + {[[Id, Pid]], _} -> {Id, Pid} + end. + +worker_loop(Id, Fun, CoordinatorPid) -> + CoordinatorPid ! {ready, Id, self()}, + (fun WorkerLoop() -> + receive + {work, Dest, Input} -> + Dest ! {result, Id, Fun(Input)}, + WorkerLoop(); + finish -> + ok + end + end)(). diff --git a/test/unit_transducers_SUITE.erl b/test/unit_transducers_SUITE.erl new file mode 100644 index 0000000000..ec64a4aa38 --- /dev/null +++ b/test/unit_transducers_SUITE.erl @@ -0,0 +1,67 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(unit_transducers_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-compile(export_all). + +all() -> + [ + {group, map_tests}, + {group, map_worker_pool_tests} + ]. + +groups() -> + [ + {map_tests, [parallel], [map_transducer]}, + {map_worker_pool_tests, [parallel], [map_worker_pool_transducer]} + ]. + +suite() -> + [ + {timetrap, {minutes, 1}} + ]. + +%% --------------------------------------------------------------------------- + +map_transducer(_Config) -> + Xf = transducers:map(fun (I) -> I + 1 end), + ?assertEqual([2, 3, 4], transducers:transduce(Xf, + fun into_list/1, + lists:seq(1, 3))). + +map_worker_pool_transducer(_Config) -> + Xf = transducers:map_worker_pool(2, fun (I) -> + timer:sleep(200), + I + I + end), + {Time, Value} = timer:tc(transducers, transduce, + [Xf, fun into_list/1, lists:seq(1, 3)]), + ?assertEqual([2, 4, 6], lists:sort(Value)), + ?assertMatch(T when T < 600000, Time, + "Should complete faster than serially"), + ?assertMatch(T when T > 400000, Time, + "But not faster than the worker count allows"). + +%% --------------------------------------------------------------------------- + +into_list({}) -> []; +into_list({Acc}) -> Acc; +into_list({Acc, I}) -> Acc ++ [I]. |