diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-04-02 12:26:28 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-04-03 16:32:32 +0100 |
| commit | 19f8dec83bfa899cf7d43e532ca8a52c73213a97 (patch) | |
| tree | 0876af93cb80b793c44c57110a33e48e55e5b68a | |
| parent | 74aac975655b4d419ac53c911768048234edbb9a (diff) | |
| download | rabbitmq-server-git-19f8dec83bfa899cf7d43e532ca8a52c73213a97.tar.gz | |
Fix in-memory counters
[#164735591]
| -rw-r--r-- | src/rabbit_fifo.erl | 113 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 77 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 71 |
3 files changed, 198 insertions, 63 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 0f957b4a19..d397459d4f 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -897,17 +897,20 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> {FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - State1 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), + State2 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), State = case Msg of - 'empty' -> subtract_in_memory_counts(Header, State1); - _ -> State1 + 'empty' -> State2; + _ -> subtract_in_memory_counts(Header, State2) end, Effects = dead_letter_effects(maxlen, #{none => FullMsg}, State, Effects0), {State, Effects}; - {{'$prefix_msg', #{size := Bytes}}, State1} -> - State = add_bytes_drop(Bytes, State1), - {State, Effects0}; + {{'$prefix_msg', #{size := Bytes} = Header}, State1} -> + State2 = subtract_in_memory_counts(Header, add_bytes_drop(Bytes, State1)), + {State2, Effects0}; + {{'$empty_msg', #{size := Bytes}}, State1} -> + State2 = add_bytes_drop(Bytes, State1), + {State2, Effects0}; empty -> {State0, Effects0} end. @@ -926,10 +929,10 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, end, State = add_bytes_enqueue(Size, State1), State#?MODULE{messages = Messages#{NextMsgNum => Msg}, - % this is probably only done to record it when low_msg_num - % is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), - next_msg_num = NextMsgNum + 1}. + %% this is probably only done to record it when low_msg_num + %% is undefined + low_msg_num = min(LowMsgNum, NextMsgNum), + next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, #?MODULE{ra_indexes = Indexes0} = State0) -> @@ -1016,6 +1019,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, fun({'$prefix_msg', _} = Msg, {S0, E0}) -> return_one(0, Msg, S0, E0, ConsumerId, Con); + ({'$empty_msg', _} = Msg, {S0, E0}) -> + return_one(0, Msg, S0, E0, + ConsumerId, Con); ({MsgNum, Msg}, {S0, E0}) -> return_one(MsgNum, Msg, S0, E0, ConsumerId, Con) @@ -1062,6 +1068,8 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> add_bytes_settle(Header, Acc); ({'$prefix_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$empty_msg', Header}, Acc) -> add_bytes_settle(Header, Acc) end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered @@ -1128,14 +1136,14 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {'$prefix_msg', Header0}, +return_one(0, {Tag, Header0}, #?MODULE{returns = Returns, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) -> + Effects0, ConsumerId, Con) when Tag == '$prefix_msg'; Tag == '$empty_msg'-> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {'$prefix_msg', Header}, + Msg = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> Checked = Con#consumer.checked_out, @@ -1144,8 +1152,12 @@ return_one(0, {'$prefix_msg', Header0}, {add_bytes_settle(Header, State1), Effects}; _ -> %% this should not affect the release cursor in any way - {add_bytes_return(Msg, - State0#?MODULE{returns = lqueue:in(Msg, Returns)}), + State1 = case Tag of + '$empty_msg' -> State0; + _ -> add_in_memory_counts(maps:get(size, Header), State0) + end, + {add_bytes_return(Header, + State1#?MODULE{returns = lqueue:in(Msg, Returns)}), Effects0} end; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, @@ -1165,16 +1177,16 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, Checked = Con#consumer.checked_out, {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0), - State2 = case RawMsg of - 'empty' -> State1; - _ -> add_in_memory_counts(maps:get(size, Header), State1) - end, - {add_bytes_settle(Header, State2), Effects1}; + {add_bytes_settle(Header, State1), Effects1}; _ -> + State1 = case RawMsg of + 'empty' -> State0; + _ -> add_in_memory_counts(maps:get(size, Header), State0) + end, %% this should not affect the release cursor in any way - {add_bytes_return(RawMsg, - State0#?MODULE{returns = - lqueue:in({MsgNum, Msg}, Returns)}), + {add_bytes_return(Header, + State1#?MODULE{returns = + lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. @@ -1184,6 +1196,8 @@ return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> Checked = lists:sort(maps:to_list(Checked0)), lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> return_one(0, Msg, S, E, ConsumerId, Consumer); + ({_, {'$empty_msg', _} = Msg}, {S, E}) -> + return_one(0, Msg, S, E, ConsumerId, Consumer); ({_, {MsgNum, Msg}}, {S, E}) -> return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) end, {State0, Effects0}, Checked). @@ -1269,6 +1283,9 @@ append_log_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages +take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) -> + %% there are prefix returns, these should be served first + {Msg, State#?MODULE{prefix_msgs = {Rem, P}}}; take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) -> %% there are prefix returns, these should be served first {{'$prefix_msg', Header}, @@ -1301,10 +1318,16 @@ take_next_msg(#?MODULE{returns = Returns, end end; empty -> - [Header | Rem] = P, - %% There are prefix msgs - {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {R, Rem}}} + [Msg | Rem] = P, + case Msg of + {Header, 'empty'} -> + %% There are prefix msgs + {{'$empty_msg', Header}, + State#?MODULE{prefix_msgs = {R, Rem}}}; + Header -> + {{'$prefix_msg', Header}, + State#?MODULE{prefix_msgs = {R, Rem}}} + end end. send_msg_effect({CTag, CPid}, Msgs) -> @@ -1360,16 +1383,20 @@ checkout_one(#?MODULE{service_queue = SQ0, consumers = Cons}, {State, Msg} = case ConsumerMsg of - {'$prefix_msg', _} -> - {add_bytes_checkout(ConsumerMsg, State1), + {'$prefix_msg', Header} -> + {subtract_in_memory_counts( + Header, add_bytes_checkout(Header, State1)), + ConsumerMsg}; + {'$empty_msg', Header} -> + {add_bytes_checkout(Header, State1), ConsumerMsg}; {_, {_, {Header, 'empty'}} = M} -> - {add_bytes_checkout(maps:get(size, Header), State1), + {add_bytes_checkout(Header, State1), M}; - {_, {_, {Header, RawMsg} = M}} -> + {_, {_, {Header, _} = M}} -> {subtract_in_memory_counts( Header, - add_bytes_checkout(RawMsg, State1)), + add_bytes_checkout(Header, State1)), M} end, {success, ConsumerId, Next, Msg, State}; @@ -1483,13 +1510,19 @@ dehydrate_state(#?MODULE{messages = Messages, %% TODO: optimise this function as far as possible PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> [Header | Acc]; + ({'$empty_msg', _} = Msg, Acc) -> + [Msg | Acc]; + ({_, {_, {Header, 'empty'}}}, Acc) -> + [{'$empty_msg', Header} | Acc]; ({_, {_, {Header, _}}}, Acc) -> [Header | Acc] end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) -> - [Header| Acc] + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) -> + [Msg | Acc]; + ({_, {_RaftIdx, {Header, _}}}, Acc) -> + [Header | Acc] end, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), @@ -1509,6 +1542,10 @@ dehydrate_state(#?MODULE{messages = Messages, dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; + (_, {'$empty_msg', _} = M) -> + M; + (_, {_, {_, {Header, 'empty'}}}) -> + {'$empty_msg', Header}; (_, {_, {_, {Header, _}}}) -> {'$prefix_msg', Header} end, Checked0), @@ -1569,18 +1606,16 @@ add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}. -add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout, +add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) -> - Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout + Bytes, msg_bytes_enqueue = Enqueue - Bytes}. add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) -> State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. -add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout, +add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue} = State) -> - Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}. @@ -1600,6 +1635,8 @@ message_size(#basic_message{content = Content}) -> iolist_size(PFR); message_size({'$prefix_msg', #{size := B}}) -> B; +message_size({'$empty_msg', #{size := B}}) -> + B; message_size(B) when is_binary(B) -> byte_size(B); message_size(Msg) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 531c5a5aed..31d6a4c21b 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -120,7 +120,8 @@ all_tests() -> queue_length_in_memory_limit, queue_length_in_memory_bytes_limit_basic_get, queue_length_in_memory_bytes_limit_subscribe, - queue_length_in_memory_bytes_limit + queue_length_in_memory_bytes_limit, + in_memory ]. memory_tests() -> @@ -1819,7 +1820,7 @@ queue_length_limit_drop_head(Config) -> no_ack = true})). queue_length_in_memory_limit_basic_get(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1841,7 +1842,7 @@ queue_length_in_memory_limit_basic_get(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, @@ -1851,7 +1852,7 @@ queue_length_in_memory_limit_basic_get(Config) -> no_ack = true})). queue_length_in_memory_limit_subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1867,7 +1868,7 @@ queue_length_in_memory_limit_subscribe(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), subscribe(Ch, QQ, false), receive @@ -1878,7 +1879,7 @@ queue_length_in_memory_limit_subscribe(Config) -> multiple = false}) end, ?assertEqual([{0, 0}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), receive {#'basic.deliver'{delivery_tag = DeliveryTag2, redelivered = false}, @@ -1888,7 +1889,7 @@ queue_length_in_memory_limit_subscribe(Config) -> end. queue_length_in_memory_limit(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1908,8 +1909,8 @@ queue_length_in_memory_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), - + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})), @@ -1919,10 +1920,10 @@ queue_length_in_memory_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)). + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). queue_length_in_memory_bytes_limit_basic_get(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1944,7 +1945,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, @@ -1954,7 +1955,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) -> no_ack = true})). queue_length_in_memory_bytes_limit_subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1970,7 +1971,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), subscribe(Ch, QQ, false), receive @@ -1981,7 +1982,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) -> multiple = false}) end, ?assertEqual([{0, 0}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), receive {#'basic.deliver'{delivery_tag = DeliveryTag2, redelivered = false}, @@ -1991,7 +1992,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) -> end. queue_length_in_memory_bytes_limit(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -2011,7 +2012,7 @@ queue_length_in_memory_bytes_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, @@ -2022,7 +2023,47 @@ queue_length_in_memory_bytes_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)). + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + +in_memory(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + + publish(Ch, QQ, Msg1), + + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + subscribe(Ch, QQ, false), + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + publish(Ch, QQ, Msg2), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + end, + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). %%---------------------------------------------------------------------------- diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 76b8d7e715..58e6c2452c 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -38,7 +38,10 @@ all_tests() -> scenario14, scenario15, scenario16, - scenario17 + scenario17, + scenario18, + scenario19, + scenario20 ]. groups() -> @@ -293,15 +296,65 @@ scenario17(_Config) -> }, Commands), ok. +scenario18(_Config) -> + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_enqueue(E,3,<<"3">>), + make_enqueue(E,4,<<"4">>), + make_enqueue(E,5,<<"5">>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + %% max_length => 3, + max_in_memory_length => 1}, Commands), + ok. + +scenario19(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,3,<<"3">>), + make_settle(C1, [0, 1]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_in_memory_bytes => 370, + max_in_memory_length => 1}, Commands), + ok. + +scenario20(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,2,simple_prefetch}), + {down, C1Pid, noconnection}, + make_enqueue(E,3,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,4,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,5,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,6,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,7,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0>>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 97, + max_in_memory_length => 1}, Commands), + ok. + snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit}, - frequency([{10, {0, 0, false, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, + InMemoryBytes}, + frequency([{10, {0, 0, false, 0, 0, 0}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), - oneof([range(1, 3), undefined]) + oneof([range(1, 3), undefined]), + oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]) }}]), ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)), collect({log_size, length(O)}, @@ -310,15 +363,19 @@ snapshots(_Config) -> Length, Bytes, SingleActiveConsumer, - DeliveryLimit), O)))) + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O)))) end, [], 2500). -config(Name, Length, Bytes, SingleActive, DeliveryLimit) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), single_active_consumer_on => SingleActive, - delivery_limit => map_max(DeliveryLimit)}. + delivery_limit => map_max(DeliveryLimit), + max_in_memory_length => map_max(InMemoryLength), + max_in_memory_bytes => map_max(InMemoryBytes)}. map_max(0) -> undefined; map_max(N) -> N. |
