diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-10-12 15:17:52 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-10-12 15:17:52 +0100 |
| commit | 73bf86e97f63597da2bbd6567d5940d05d47924f (patch) | |
| tree | 6411324045efed9fc764227efdeb2ad2d0dc8987 | |
| parent | 8055e7d8059d9421e1232d226e5d9383c80734c1 (diff) | |
| parent | 4fa1d121f319b290e12beb8f36d2a7df8b99487f (diff) | |
| download | rabbitmq-server-git-73bf86e97f63597da2bbd6567d5940d05d47924f.tar.gz | |
Merge bug24477
| -rw-r--r-- | src/rabbit_msg_store.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 68 |
2 files changed, 56 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fc3cbebd4e..e4691b81d0 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1103,9 +1103,10 @@ record_pending_confirm(CRef, MsgId, State) -> client_confirm(CRef, MsgIds, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(MsgIds, ActionTaken), case dict:find(CRef, CTM) of - {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds), + {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds), + ActionTaken), + MsgIds1 = gb_sets:difference(Gs, MsgIds), case gb_sets:is_empty(MsgIds1) of true -> dict:erase(CRef, CTM); false -> dict:store(CRef, MsgIds1, CTM) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d5dafd6402..5e034ae7df 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1837,27 +1837,34 @@ msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). on_disk_capture() -> - on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}). -on_disk_capture({OnDisk, Awaiting, Pid}) -> - Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of - true -> Pid ! {self(), arrived}, undefined; - false -> Pid - end, receive - {await, MsgIds, Pid2} -> - true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting), - on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2}); - {on_disk, MsgIds} -> - on_disk_capture({gb_sets:union(OnDisk, MsgIds), - gb_sets:subtract(Awaiting, MsgIds), - Pid1}); + {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid); + stop -> done + end. + +on_disk_capture(OnDisk, Awaiting, Pid) -> + receive + {on_disk, MsgIdsS} -> + MsgIds = gb_sets:to_list(MsgIdsS), + on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds, + Pid); stop -> done + after 100 -> + case {OnDisk, Awaiting} of + {[], []} -> Pid ! {self(), arrived}, on_disk_capture(); + {_, []} -> Pid ! {self(), surplus}; + {[], _} -> Pid ! {self(), timeout}; + {_, _} -> Pid ! {self(), surplus_timeout} + end end. on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> - Pid ! {await, gb_sets:from_list(MsgIds), self()}, - receive {Pid, arrived} -> ok end. + Pid ! {await, MsgIds, self()}, + receive + {Pid, arrived} -> ok; + {Pid, Error} -> Error + end. on_disk_stop(Pid) -> MRef = erlang:monitor(process, Pid), @@ -1922,6 +1929,8 @@ test_msg_store() -> ?PERSISTENT_MSG_STORE, Ref2), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds, MSCState), + %% test confirm logic + passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half @@ -2033,6 +2042,35 @@ test_msg_store() -> restart_msg_store_empty(), passed. +%% We want to test that writes that get eliminated due to removes still +%% get confirmed. Removes themselves do not. +test_msg_store_confirms(MsgIds, Cap, MSCState) -> + %% write -> confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% remove -> _ + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, []), + %% write, remove -> confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% write, remove, write -> confirmed, confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds ++ MsgIds), + %% remove, write -> confirmed + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% remove, write, remove -> confirmed + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + passed. + queue_name(Name) -> rabbit_misc:r(<<"/">>, queue, Name). |
