diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-11 22:40:22 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-11 22:40:22 +0100 |
| commit | 033fc53d7aba10cc70457c24fc712ee2325f7310 (patch) | |
| tree | 822be320f99b94f300cab126e294ef15ccc01a39 | |
| parent | 8055e7d8059d9421e1232d226e5d9383c80734c1 (diff) | |
| download | rabbitmq-server-git-033fc53d7aba10cc70457c24fc712ee2325f7310.tar.gz | |
make tests wait for *exactly* the right confirms, no more, no fewer
This does introduce a 100ms pause for every invocation; hopefully
enough for the msg_store to catch up and sync (the sync interval is
25ms) the usually small numbers of messages.
The confirm capture code now uses lists rather than gb_sets, as a
convenient 'bag' data structure. This can have O(n^2) perf, but we
only need to start worrying about that when the tests call this code
with thousands of msg ids, which currently they don't.
| -rw-r--r-- | src/rabbit_tests.erl | 37 |
1 files changed, 22 insertions, 15 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d5dafd6402..b8f3694ddd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1837,27 +1837,34 @@ msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). on_disk_capture() -> - on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}). -on_disk_capture({OnDisk, Awaiting, Pid}) -> - Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of - true -> Pid ! {self(), arrived}, undefined; - false -> Pid - end, receive - {await, MsgIds, Pid2} -> - true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting), - on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2}); - {on_disk, MsgIds} -> - on_disk_capture({gb_sets:union(OnDisk, MsgIds), - gb_sets:subtract(Awaiting, MsgIds), - Pid1}); + {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid); + stop -> done + end. + +on_disk_capture(OnDisk, Awaiting, Pid) -> + receive + {on_disk, MsgIdsS} -> + MsgIds = gb_sets:to_list(MsgIdsS), + on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds, + Pid); stop -> done + after 100 -> + case {OnDisk, Awaiting} of + {[], []} -> Pid ! {self(), arrived}, on_disk_capture(); + {_, []} -> Pid ! {self(), surplus}; + {[], _} -> Pid ! {self(), timeout}; + {_, _} -> Pid ! {self(), surplus_timeout} + end end. on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> - Pid ! {await, gb_sets:from_list(MsgIds), self()}, - receive {Pid, arrived} -> ok end. + Pid ! {await, MsgIds, self()}, + receive + {Pid, arrived} -> ok; + {Pid, Error} -> Error + end. on_disk_stop(Pid) -> MRef = erlang:monitor(process, Pid), |
